From de5ff6a706cc10831ee44484053f1c9dbc118792 Mon Sep 17 00:00:00 2001 From: Dima Krasner Date: Sat, 27 Jan 2024 12:37:06 +0200 Subject: [PATCH] use the same interface for all periodic jobs --- cmd/tootik/main.go | 126 +++++++++++++++++++++------------------------ data/garbage.go | 32 +++++++----- outbox/move.go | 40 ++++++++------ outbox/poll.go | 22 +++++--- test/edit_test.go | 18 +++++-- test/move_test.go | 54 ++++++++++++++++--- test/poll_test.go | 28 ++++++++-- 7 files changed, 203 insertions(+), 117 deletions(-) diff --git a/cmd/tootik/main.go b/cmd/tootik/main.go index e996dc08..e19b0703 100644 --- a/cmd/tootik/main.go +++ b/cmd/tootik/main.go @@ -157,7 +157,12 @@ func main() { panic(err) } - if err := data.CollectGarbage(ctx, *domain, &cfg, db); err != nil { + gc := data.GarbageCollector{ + Domain: *domain, + Config: &cfg, + DB: db, + } + if err := gc.Run(ctx); err != nil { panic(err) } @@ -291,77 +296,66 @@ func main() { }() } - wg.Add(1) - go func() { - defer wg.Done() - defer cancel() - - t := time.NewTicker(pollResultsUpdateInterval) - defer t.Stop() - - for { - log.Info("Updating poll results") - if err := outbox.UpdatePollResults(ctx, *domain, log, db); err != nil { - log.Error("Failed to update poll results", "error", err) - break - } - - select { - case <-ctx.Done(): - return - - case <-t.C: - } - - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - defer cancel() - - t := time.NewTicker(followMoveInterval) - defer t.Stop() - - for { - if err := outbox.Move(ctx, *domain, log, db, resolver, nobody); err != nil { - log.Error("Failed to move follows", "error", err) - break - } - - select { - case <-ctx.Done(): - return - - case <-t.C: - } + for _, job := range []struct { + Name string + Interval time.Duration + Runner interface { + Run(context.Context) error } - }() - - wg.Add(1) - go func() { - defer wg.Done() - defer cancel() + }{ + { + "poller", + pollResultsUpdateInterval, + &outbox.Poller{ + Domain: *domain, + Log: log, + DB: db, + }, + }, + { + "mover", + followMoveInterval, + &outbox.Mover{ + Domain: *domain, + Log: log, + DB: db, + Resolver: resolver, + Actor: nobody, + }, + }, + { + "gc", + garbageCollectionInterval, + &gc, + }, + } { + name := job.Name + interval := job.Interval + runner := job.Runner + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() - t := time.NewTicker(garbageCollectionInterval) - defer t.Stop() + t := time.NewTicker(interval) + defer t.Stop() - for { - log.Info("Collecting garbage") - if err := data.CollectGarbage(ctx, *domain, &cfg, db); err != nil { - log.Error("Failed to collect garbage", "error", err) - break - } + for { + log.Info("Running periodic job", "name", name) + if err := runner.Run(ctx); err != nil { + log.Error("Periodic job has failed", "name", name, "error", err) + break + } - select { - case <-ctx.Done(): - return + select { + case <-ctx.Done(): + return - case <-t.C: + case <-t.C: + } } - } - }() + }() + } <-ctx.Done() log.Info("Shutting down") diff --git a/data/garbage.go b/data/garbage.go index e58a78a0..dfb388f1 100644 --- a/data/garbage.go +++ b/data/garbage.go @@ -24,51 +24,57 @@ import ( "time" ) -// CollectGarbage deletes old data. -func CollectGarbage(ctx context.Context, domain string, cfg *cfg.Config, db *sql.DB) error { +type GarbageCollector struct { + Domain string + Config *cfg.Config + DB *sql.DB +} + +// Collect deletes old data. +func (gc *GarbageCollector) Run(ctx context.Context) error { now := time.Now() - if _, err := db.ExecContext(ctx, `delete from notesfts where id in (select notes.id from notes left join follows on follows.followed in (notes.author, notes.cc0, notes.to0, notes.cc1, notes.to1, notes.cc2, notes.to2) or (notes.to2 is not null and exists (select 1 from json_each(notes.object->'to') where value = follows.followed)) or (notes.cc2 is not null and exists (select 1 from json_each(notes.object->'cc') where value = follows.followed)) where follows.accepted = 1 and notes.inserted < unixepoch()-60*60*24 and notes.host != ? and follows.id is null)`, domain); err != nil { + if _, err := gc.DB.ExecContext(ctx, `delete from notesfts where id in (select notes.id from notes left join follows on follows.followed in (notes.author, notes.cc0, notes.to0, notes.cc1, notes.to1, notes.cc2, notes.to2) or (notes.to2 is not null and exists (select 1 from json_each(notes.object->'to') where value = follows.followed)) or (notes.cc2 is not null and exists (select 1 from json_each(notes.object->'cc') where value = follows.followed)) where follows.accepted = 1 and notes.inserted < unixepoch()-60*60*24 and notes.host != ? and follows.id is null)`, gc.Domain); err != nil { return fmt.Errorf("failed to remove invisible posts: %w", err) } - if _, err := db.ExecContext(ctx, `delete from notes where id in (select notes.id from notes left join follows on follows.followed in (notes.author, notes.cc0, notes.to0, notes.cc1, notes.to1, notes.cc2, notes.to2) or (notes.to2 is not null and exists (select 1 from json_each(notes.object->'to') where value = follows.followed)) or (notes.cc2 is not null and exists (select 1 from json_each(notes.object->'cc') where value = follows.followed)) where follows.accepted = 1 and notes.inserted < unixepoch()-60*60*24 and notes.host != ? and follows.id is null)`, domain); err != nil { + if _, err := gc.DB.ExecContext(ctx, `delete from notes where id in (select notes.id from notes left join follows on follows.followed in (notes.author, notes.cc0, notes.to0, notes.cc1, notes.to1, notes.cc2, notes.to2) or (notes.to2 is not null and exists (select 1 from json_each(notes.object->'to') where value = follows.followed)) or (notes.cc2 is not null and exists (select 1 from json_each(notes.object->'cc') where value = follows.followed)) where follows.accepted = 1 and notes.inserted < unixepoch()-60*60*24 and notes.host != ? and follows.id is null)`, gc.Domain); err != nil { return fmt.Errorf("failed to remove invisible posts: %w", err) } - if _, err := db.ExecContext(ctx, `delete from notesfts where id in (select id from notes where inserted < unixepoch()-60*60*24*7 and author not in (select followed from follows where accepted = 1) and host != ?)`, domain); err != nil { + if _, err := gc.DB.ExecContext(ctx, `delete from notesfts where id in (select id from notes where inserted < unixepoch()-60*60*24*7 and author not in (select followed from follows where accepted = 1) and host != ?)`, gc.Domain); err != nil { return fmt.Errorf("failed to remove posts by authors without followers: %w", err) } - if _, err := db.ExecContext(ctx, `delete from notes where inserted < unixepoch()-60*60*24*7 and author not in (select followed from follows where accepted = 1) and host != ?`, domain); err != nil { + if _, err := gc.DB.ExecContext(ctx, `delete from notes where inserted < unixepoch()-60*60*24*7 and author not in (select followed from follows where accepted = 1) and host != ?`, gc.Domain); err != nil { return fmt.Errorf("failed to remove posts by authors without followers: %w", err) } - if _, err := db.ExecContext(ctx, `delete from notesfts where id in (select id from notes where inserted < ? and host != ?)`, now.Add(-cfg.NotesTTL).Unix(), domain); err != nil { + if _, err := gc.DB.ExecContext(ctx, `delete from notesfts where id in (select id from notes where inserted < ? and host != ?)`, now.Add(-gc.Config.NotesTTL).Unix(), gc.Domain); err != nil { return fmt.Errorf("failed to remove old posts: %w", err) } - if _, err := db.ExecContext(ctx, `delete from notes where inserted < ? and host != ?`, now.Add(-cfg.NotesTTL).Unix(), domain); err != nil { + if _, err := gc.DB.ExecContext(ctx, `delete from notes where inserted < ? and host != ?`, now.Add(-gc.Config.NotesTTL).Unix(), gc.Domain); err != nil { return fmt.Errorf("failed to remove old posts: %w", err) } - if _, err := db.ExecContext(ctx, `delete from hashtags where not exists (select 1 from notes where notes.id = hashtags.note)`); err != nil { + if _, err := gc.DB.ExecContext(ctx, `delete from hashtags where not exists (select 1 from notes where notes.id = hashtags.note)`); err != nil { return fmt.Errorf("failed to remove old hashtags: %w", err) } - if _, err := db.ExecContext(ctx, `delete from shares where not exists (select 1 from persons where persons.id = shares.by) or (inserted < ? and not exists (select 1 from notes where notes.id = shares.note))`, now.Add(-cfg.SharesTTL).Unix()); err != nil { + if _, err := gc.DB.ExecContext(ctx, `delete from shares where not exists (select 1 from persons where persons.id = shares.by) or (inserted < ? and not exists (select 1 from notes where notes.id = shares.note))`, now.Add(-gc.Config.SharesTTL).Unix()); err != nil { return fmt.Errorf("failed to remove old shares: %w", err) } - if _, err := db.ExecContext(ctx, `delete from outbox where inserted < ? and host != ?`, now.Add(-cfg.DeliveryTTL).Unix(), domain); err != nil { + if _, err := gc.DB.ExecContext(ctx, `delete from outbox where inserted < ? and host != ?`, now.Add(-gc.Config.DeliveryTTL).Unix(), gc.Domain); err != nil { return fmt.Errorf("failed to remove old posts: %w", err) } - if _, err := db.ExecContext(ctx, `delete from follows where accepted = 0 and inserted < ?`, now.Add(-cfg.FollowAcceptTimeout).Unix()); err != nil { + if _, err := gc.DB.ExecContext(ctx, `delete from follows where accepted = 0 and inserted < ?`, now.Add(-gc.Config.FollowAcceptTimeout).Unix()); err != nil { return fmt.Errorf("failed to remove failed follow requests: %w", err) } - if _, err := db.ExecContext(ctx, `delete from persons where updated < ? and host != ? and not exists (select 1 from follows where followed = persons.id) and not exists (select 1 from notes where notes.author = persons.id) and not exists (select 1 from shares where shares.by = persons.id)`, now.Add(-cfg.ActorTTL).Unix(), domain); err != nil { + if _, err := gc.DB.ExecContext(ctx, `delete from persons where updated < ? and host != ? and not exists (select 1 from follows where followed = persons.id) and not exists (select 1 from notes where notes.author = persons.id) and not exists (select 1 from shares where shares.by = persons.id)`, now.Add(-gc.Config.ActorTTL).Unix(), gc.Domain); err != nil { return fmt.Errorf("failed to remove idle actors: %w", err) } diff --git a/outbox/move.go b/outbox/move.go index c92f7fd2..501ea806 100644 --- a/outbox/move.go +++ b/outbox/move.go @@ -25,8 +25,16 @@ import ( "log/slog" ) -func updatedMoveTargets(ctx context.Context, log *slog.Logger, db *sql.DB, resolver *fed.Resolver, from *ap.Actor, prefix string) error { - rows, err := db.QueryContext(ctx, `select oldid, newid from (select old.id as oldid, new.id as newid, old.updated as oldupdated from persons old join persons new on old.actor->>'movedTo' = new.id and not exists (select 1 from json_each(new.actor->'alsoKnownAs') where value = old.id) and old.updated > new.updated where old.actor->>'movedTo' is not null union select old.id, old.actor->>'movedTo', old.updated from persons old where old.actor->>'movedTo' is not null and not exists (select 1 from persons new where new.id = old.actor->>'movedTo')) where exists (select 1 from follows where followed = oldid and follower like ? and inserted < oldupdated)`, prefix) +type Mover struct { + Domain string + Log *slog.Logger + DB *sql.DB + Resolver *fed.Resolver + Actor *ap.Actor +} + +func (m *Mover) updatedMoveTargets(ctx context.Context, prefix string) error { + rows, err := m.DB.QueryContext(ctx, `select oldid, newid from (select old.id as oldid, new.id as newid, old.updated as oldupdated from persons old join persons new on old.actor->>'movedTo' = new.id and not exists (select 1 from json_each(new.actor->'alsoKnownAs') where value = old.id) and old.updated > new.updated where old.actor->>'movedTo' is not null union select old.id, old.actor->>'movedTo', old.updated from persons old where old.actor->>'movedTo' is not null and not exists (select 1 from persons new where new.id = old.actor->>'movedTo')) where exists (select 1 from follows where followed = oldid and follower like ? and inserted < oldupdated)`, prefix) if err != nil { return fmt.Errorf("failed to moved actors: %w", err) } @@ -35,33 +43,33 @@ func updatedMoveTargets(ctx context.Context, log *slog.Logger, db *sql.DB, resol for rows.Next() { var oldID, newID string if err := rows.Scan(&oldID, &newID); err != nil { - log.Error("Failed to scan moved actor", "error", err) + m.Log.Error("Failed to scan moved actor", "error", err) continue } - actor, err := resolver.Resolve(ctx, log, db, from, newID, false) + actor, err := m.Resolver.Resolve(ctx, m.Log, m.DB, m.Actor, newID, false) if err != nil { - log.Warn("Failed to resolve move target", "old", oldID, "new", newID, "error", err) + m.Log.Warn("Failed to resolve move target", "old", oldID, "new", newID, "error", err) continue } if !actor.AlsoKnownAs.Contains(oldID) { - log.Warn("New account does not point to old account", "new", newID, "old", oldID) + m.Log.Warn("New account does not point to old account", "new", newID, "old", oldID) } } return nil } -func Move(ctx context.Context, domain string, log *slog.Logger, db *sql.DB, resolver *fed.Resolver, from *ap.Actor) error { - prefix := fmt.Sprintf("https://%s/%%", domain) +func (m *Mover) Run(ctx context.Context) error { + prefix := fmt.Sprintf("https://%s/%%", m.Domain) // updated new actor if old actor specifies movedTo but new actor doesn't specify old actor in alsoKnownAs - if err := updatedMoveTargets(ctx, log, db, resolver, from, prefix); err != nil { + if err := m.updatedMoveTargets(ctx, prefix); err != nil { return err } - rows, err := db.QueryContext(ctx, `select persons.actor, old.id, new.id, follows.id from persons old join persons new on old.actor->>'movedTo' = new.id and exists (select 1 from json_each(new.actor->'alsoKnownAs') where value = old.id) join follows on follows.followed = old.id and follows.inserted < old.updated join persons on persons.id = follows.follower where old.actor->>'movedTo' is not null and follows.follower like ?`, prefix) + rows, err := m.DB.QueryContext(ctx, `select persons.actor, old.id, new.id, follows.id from persons old join persons new on old.actor->>'movedTo' = new.id and exists (select 1 from json_each(new.actor->'alsoKnownAs') where value = old.id) join follows on follows.followed = old.id and follows.inserted < old.updated join persons on persons.id = follows.follower where old.actor->>'movedTo' is not null and follows.follower like ?`, prefix) if err != nil { return fmt.Errorf("failed to fetch follows to move: %w", err) } @@ -71,17 +79,17 @@ func Move(ctx context.Context, domain string, log *slog.Logger, db *sql.DB, reso var actor ap.Actor var oldID, newID, oldFollowID string if err := rows.Scan(&actor, &oldID, &newID, &oldFollowID); err != nil { - log.Error("Failed to scan follow to move", "error", err) + m.Log.Error("Failed to scan follow to move", "error", err) continue } - log.Info("Moving follow", "follow", oldFollowID, "old", oldID, "new", newID) - if err := Follow(ctx, domain, &actor, newID, db); err != nil { - log.Warn("Failed to follow new actor", "follow", oldFollowID, "old", oldID, "new", newID, "error", err) + m.Log.Info("Moving follow", "follow", oldFollowID, "old", oldID, "new", newID) + if err := Follow(ctx, m.Domain, &actor, newID, m.DB); err != nil { + m.Log.Warn("Failed to follow new actor", "follow", oldFollowID, "old", oldID, "new", newID, "error", err) continue } - if err := Unfollow(ctx, domain, log, db, &actor, oldID, oldFollowID); err != nil { - log.Warn("Failed to unfollow old actor", "follow", oldFollowID, "old", oldID, "new", newID, "error", err) + if err := Unfollow(ctx, m.Domain, m.Log, m.DB, &actor, oldID, oldFollowID); err != nil { + m.Log.Warn("Failed to unfollow old actor", "follow", oldFollowID, "old", oldID, "new", newID, "error", err) } } diff --git a/outbox/poll.go b/outbox/poll.go index 4fc1d81d..e7c8c12f 100644 --- a/outbox/poll.go +++ b/outbox/poll.go @@ -25,12 +25,18 @@ import ( "time" ) +type Poller struct { + Domain string + Log *slog.Logger + DB *sql.DB +} + type pollResult struct { PollID, Option string } -func UpdatePollResults(ctx context.Context, domain string, log *slog.Logger, db *sql.DB) error { - rows, err := db.QueryContext(ctx, `select poll, option, count(*) from (select polls.id as poll, votes.object->>'name' as option, votes.author as voter from notes polls join notes votes on votes.object->>'inReplyTo' = polls.id where polls.object->>'type' = 'Question' and polls.id like $1 and polls.object->>'closed' is null and votes.object->>'name' is not null group by poll, option, voter) group by poll, option`, fmt.Sprintf("https://%s/%%", domain)) +func (p *Poller) Run(ctx context.Context) error { + rows, err := p.DB.QueryContext(ctx, `select poll, option, count(*) from (select polls.id as poll, votes.object->>'name' as option, votes.author as voter from notes polls join notes votes on votes.object->>'inReplyTo' = polls.id where polls.object->>'type' = 'Question' and polls.id like $1 and polls.object->>'closed' is null and votes.object->>'name' is not null group by poll, option, voter) group by poll, option`, fmt.Sprintf("https://%s/%%", p.Domain)) if err != nil { return err } @@ -43,7 +49,7 @@ func UpdatePollResults(ctx context.Context, domain string, log *slog.Logger, db var pollID, option string var count int64 if err := rows.Scan(&pollID, &option, &count); err != nil { - log.Warn("Failed to scan poll result", "error", err) + p.Log.Warn("Failed to scan poll result", "error", err) continue } @@ -53,8 +59,8 @@ func UpdatePollResults(ctx context.Context, domain string, log *slog.Logger, db } var obj ap.Object - if err := db.QueryRowContext(ctx, "select object from notes where id = ?", pollID).Scan(&obj); err != nil { - log.Warn("Failed to fetch poll", "poll", pollID, "error", err) + if err := p.DB.QueryRowContext(ctx, "select object from notes where id = ?", pollID).Scan(&obj); err != nil { + p.Log.Warn("Failed to fetch poll", "poll", pollID, "error", err) continue } @@ -92,10 +98,10 @@ func UpdatePollResults(ctx context.Context, domain string, log *slog.Logger, db continue } - log.Info("Updating poll results", "poll", poll.ID) + p.Log.Info("Updating poll results", "poll", poll.ID) - if err := UpdateNote(ctx, domain, db, poll); err != nil { - log.Warn("Failed to update poll results", "poll", poll.ID, "error", err) + if err := UpdateNote(ctx, p.Domain, p.DB, poll); err != nil { + p.Log.Warn("Failed to update poll results", "poll", poll.ID, "error", err) } } diff --git a/test/edit_test.go b/test/edit_test.go index fc6563d3..9b360e65 100644 --- a/test/edit_test.go +++ b/test/edit_test.go @@ -450,7 +450,12 @@ func TestEdit_PollAddOption(t *testing.T) { reply := server.Handle(fmt.Sprintf("/users/reply/%s?Hell%%20yeah%%21", say[15:len(say)-2]), server.Bob) assert.Regexp(`^30 /users/view/\S+\r\n$`, reply) - assert.NoError(outbox.UpdatePollResults(context.Background(), domain, slog.Default(), server.db)) + poller := outbox.Poller{ + Domain: domain, + Log: slog.Default(), + DB: server.db, + } + assert.NoError(poller.Run(context.Background())) view := server.Handle("/users/view/"+id, server.Bob) assert.Contains(view, "So, polls on Station are pretty cool, right?") @@ -470,7 +475,7 @@ func TestEdit_PollAddOption(t *testing.T) { reply = server.Handle(fmt.Sprintf("/users/reply/%s?I%%20couldn%%27t%%20care%%20less", say[15:len(say)-2]), server.Carol) assert.Regexp(`^30 /users/view/\S+\r\n$`, reply) - assert.NoError(outbox.UpdatePollResults(context.Background(), domain, slog.Default(), server.db)) + assert.NoError(poller.Run(context.Background())) view = server.Handle("/users/view/"+id, server.Bob) assert.Contains(view, "So, polls on Station are pretty cool, right?") @@ -496,7 +501,12 @@ func TestEdit_RemoveQuestion(t *testing.T) { reply := server.Handle(fmt.Sprintf("/users/reply/%s?Hell%%20yeah%%21", say[15:len(say)-2]), server.Bob) assert.Regexp(`^30 /users/view/\S+\r\n$`, reply) - assert.NoError(outbox.UpdatePollResults(context.Background(), domain, slog.Default(), server.db)) + poller := outbox.Poller{ + Domain: domain, + Log: slog.Default(), + DB: server.db, + } + assert.NoError(poller.Run(context.Background())) view := server.Handle("/users/view/"+id, server.Bob) assert.Contains(view, "So, polls on Station are pretty cool, right?") @@ -513,7 +523,7 @@ func TestEdit_RemoveQuestion(t *testing.T) { edit := server.Handle(fmt.Sprintf("/users/edit/%s?This%%20is%%20not%%20a%%20poll", id), server.Alice) assert.Equal(fmt.Sprintf("30 /users/view/%s\r\n", id), edit) - assert.NoError(outbox.UpdatePollResults(context.Background(), domain, slog.Default(), server.db)) + assert.NoError(poller.Run(context.Background())) view = server.Handle("/users/view/"+id, server.Bob) assert.Contains(view, "This is not a poll") diff --git a/test/move_test.go b/test/move_test.go index c12d072f..a1dbb3c3 100644 --- a/test/move_test.go +++ b/test/move_test.go @@ -64,7 +64,14 @@ func TestMove_FederatedToFederated(t *testing.T) { ) assert.NoError(err) - assert.NoError(outbox.Move(context.Background(), domain, slog.Default(), server.db, fed.NewResolver(nil, domain, server.cfg), server.Nobody)) + mover := outbox.Mover{ + Domain: domain, + Log: slog.Default(), + DB: server.db, + Resolver: fed.NewResolver(nil, domain, server.cfg), + Actor: server.Nobody, + } + assert.NoError(mover.Run(context.Background())) var followed int assert.NoError(server.db.QueryRow(`select exists (select 1 from follows where follower = $1 and followed = $2 and accepted = 0) and exists (select 1 from outbox where activity->>'type' = 'Follow' and activity->>'actor' = $1 and activity->>'object' = $2)`, server.Alice.ID, "https://::1/user/dan").Scan(&followed)) @@ -110,7 +117,14 @@ func TestMove_FederatedToFederatedTwoAccounts(t *testing.T) { ) assert.NoError(err) - assert.NoError(outbox.Move(context.Background(), domain, slog.Default(), server.db, fed.NewResolver(nil, domain, server.cfg), server.Nobody)) + mover := outbox.Mover{ + Domain: domain, + Log: slog.Default(), + DB: server.db, + Resolver: fed.NewResolver(nil, domain, server.cfg), + Actor: server.Nobody, + } + assert.NoError(mover.Run(context.Background())) var followed int assert.NoError(server.db.QueryRow(`select exists (select 1 from follows where follower = $1 and followed = $2 and accepted = 0) and exists (select 1 from outbox where activity->>'type' = 'Follow' and activity->>'actor' = $1 and activity->>'object' = $2)`, server.Alice.ID, "https://::1/user/dan").Scan(&followed)) @@ -156,7 +170,14 @@ func TestMove_FederatedToFederatedNotLinked(t *testing.T) { ) assert.NoError(err) - assert.NoError(outbox.Move(context.Background(), domain, slog.Default(), server.db, fed.NewResolver(nil, domain, server.cfg), server.Nobody)) + mover := outbox.Mover{ + Domain: domain, + Log: slog.Default(), + DB: server.db, + Resolver: fed.NewResolver(nil, domain, server.cfg), + Actor: server.Nobody, + } + assert.NoError(mover.Run(context.Background())) var followed int assert.NoError(server.db.QueryRow(`select exists (select 1 from follows where follower = $1 and followed = $2 and accepted = 0) or exists (select 1 from outbox where activity->>'type' = 'Follow' and activity->>'actor' = $1 and activity->>'object' = $2)`, server.Alice.ID, "https://::1/user/dan").Scan(&followed)) @@ -202,7 +223,14 @@ func TestMove_FederatedToFederatedFollowedAfterUpdate(t *testing.T) { ) assert.NoError(err) - assert.NoError(outbox.Move(context.Background(), domain, slog.Default(), server.db, fed.NewResolver(nil, domain, server.cfg), server.Nobody)) + mover := outbox.Mover{ + Domain: domain, + Log: slog.Default(), + DB: server.db, + Resolver: fed.NewResolver(nil, domain, server.cfg), + Actor: server.Nobody, + } + assert.NoError(mover.Run(context.Background())) var followed int assert.NoError(server.db.QueryRow(`select exists (select 1 from follows where follower = $1 and followed = $2 and accepted = 0) and exists (select 1 from outbox where activity->>'type' = 'Follow' and activity->>'actor' = $1 and activity->>'object' = $2)`, server.Alice.ID, "https://::1/user/dan").Scan(&followed)) @@ -241,7 +269,14 @@ func TestMove_FederatedToLocal(t *testing.T) { ) assert.NoError(err) - assert.NoError(outbox.Move(context.Background(), domain, slog.Default(), server.db, fed.NewResolver(nil, domain, server.cfg), server.Nobody)) + mover := outbox.Mover{ + Domain: domain, + Log: slog.Default(), + DB: server.db, + Resolver: fed.NewResolver(nil, domain, server.cfg), + Actor: server.Nobody, + } + assert.NoError(mover.Run(context.Background())) var followed int assert.NoError(server.db.QueryRow(`select exists (select 1 from follows where follower = $1 and followed = $2 and accepted = 1) and not exists (select 1 from outbox where activity->>'type' = 'Follow' and activity->>'actor' = $1 and activity->>'object' = $2)`, server.Alice.ID, server.Bob.ID).Scan(&followed)) @@ -283,7 +318,14 @@ func TestMove_FederatedToLocalLinked(t *testing.T) { _, err = server.db.Exec(`UPDATE persons SET actor = json_set(actor, '$.alsoKnownAs', $1) WHERE id = $2`, "https://127.0.0.1/user/dan", server.Bob.ID) assert.NoError(err) - assert.NoError(outbox.Move(context.Background(), domain, slog.Default(), server.db, fed.NewResolver(nil, domain, server.cfg), server.Nobody)) + mover := outbox.Mover{ + Domain: domain, + Log: slog.Default(), + DB: server.db, + Resolver: fed.NewResolver(nil, domain, server.cfg), + Actor: server.Nobody, + } + assert.NoError(mover.Run(context.Background())) var followed int assert.NoError(server.db.QueryRow(`select exists (select 1 from follows where follower = $1 and followed = $2 and accepted = 1) and not exists (select 1 from outbox where activity->>'type' = 'Follow' and activity->>'actor' = $1 and activity->>'object' = $2)`, server.Alice.ID, server.Bob.ID).Scan(&followed)) diff --git a/test/poll_test.go b/test/poll_test.go index a2b9f5fa..29065c42 100644 --- a/test/poll_test.go +++ b/test/poll_test.go @@ -802,7 +802,12 @@ func TestPoll_Local3OptionsAnd2Votes(t *testing.T) { assert.NotContains(strings.Split(view, "\n"), "1 ████████ Hell yeah!") assert.NotContains(strings.Split(view, "\n"), "1 ████████ I couldn't care less") - assert.NoError(outbox.UpdatePollResults(context.Background(), domain, slog.Default(), server.db)) + poller := outbox.Poller{ + Domain: domain, + Log: slog.Default(), + DB: server.db, + } + assert.NoError(poller.Run(context.Background())) view = server.Handle(say[3:len(say)-2], server.Bob) assert.Contains(view, "So, polls on Station are pretty cool, right?") @@ -839,7 +844,12 @@ func TestPoll_Local3OptionsAnd2VotesAndDeletedVote(t *testing.T) { delete := server.Handle("/users/delete/"+reply[15:len(reply)-2], server.Carol) assert.Equal(fmt.Sprintf("30 /users/outbox/%s\r\n", strings.TrimPrefix(server.Carol.ID, "https://")), delete) - assert.NoError(outbox.UpdatePollResults(context.Background(), domain, slog.Default(), server.db)) + poller := outbox.Poller{ + Domain: domain, + Log: slog.Default(), + DB: server.db, + } + assert.NoError(poller.Run(context.Background())) view = server.Handle(say[3:len(say)-2], server.Bob) assert.Contains(view, "So, polls on Station are pretty cool, right?") @@ -871,7 +881,12 @@ func TestPoll_LocalVoteVisibilityFollowers(t *testing.T) { reply = server.Handle(fmt.Sprintf("/users/reply/%s?I%%20couldn%%27t%%20care%%20less", whisper[15:len(whisper)-2]), server.Carol) assert.Regexp(`^30 /users/view/\S+\r\n$`, reply) - assert.NoError(outbox.UpdatePollResults(context.Background(), domain, slog.Default(), server.db)) + poller := outbox.Poller{ + Domain: domain, + Log: slog.Default(), + DB: server.db, + } + assert.NoError(poller.Run(context.Background())) view := server.Handle(whisper[3:len(whisper)-2], server.Alice) assert.Contains(view, "So, polls on Station are pretty cool, right?") @@ -922,7 +937,12 @@ func TestPoll_LocalVoteVisibilityPublic(t *testing.T) { reply = server.Handle(fmt.Sprintf("/users/reply/%s?I%%20couldn%%27t%%20care%%20less", say[15:len(say)-2]), server.Carol) assert.Regexp(`^30 /users/view/\S+\r\n$`, reply) - assert.NoError(outbox.UpdatePollResults(context.Background(), domain, slog.Default(), server.db)) + poller := outbox.Poller{ + Domain: domain, + Log: slog.Default(), + DB: server.db, + } + assert.NoError(poller.Run(context.Background())) view := server.Handle(say[3:len(say)-2], server.Alice) assert.Contains(view, "So, polls on Station are pretty cool, right?")