Skip to content

Commit

Permalink
use the same interface for all periodic jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
dimkr committed Jan 27, 2024
1 parent c2ab88a commit de5ff6a
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 117 deletions.
126 changes: 60 additions & 66 deletions cmd/tootik/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,12 @@ func main() {
panic(err)
}

if err := data.CollectGarbage(ctx, *domain, &cfg, db); err != nil {
gc := data.GarbageCollector{
Domain: *domain,
Config: &cfg,
DB: db,
}
if err := gc.Run(ctx); err != nil {
panic(err)
}

Expand Down Expand Up @@ -291,77 +296,66 @@ func main() {
}()
}

wg.Add(1)
go func() {
defer wg.Done()
defer cancel()

t := time.NewTicker(pollResultsUpdateInterval)
defer t.Stop()

for {
log.Info("Updating poll results")
if err := outbox.UpdatePollResults(ctx, *domain, log, db); err != nil {
log.Error("Failed to update poll results", "error", err)
break
}

select {
case <-ctx.Done():
return

case <-t.C:
}

}
}()

wg.Add(1)
go func() {
defer wg.Done()
defer cancel()

t := time.NewTicker(followMoveInterval)
defer t.Stop()

for {
if err := outbox.Move(ctx, *domain, log, db, resolver, nobody); err != nil {
log.Error("Failed to move follows", "error", err)
break
}

select {
case <-ctx.Done():
return

case <-t.C:
}
for _, job := range []struct {
Name string
Interval time.Duration
Runner interface {
Run(context.Context) error
}
}()

wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
}{
{
"poller",
pollResultsUpdateInterval,
&outbox.Poller{
Domain: *domain,
Log: log,
DB: db,
},
},
{
"mover",
followMoveInterval,
&outbox.Mover{
Domain: *domain,
Log: log,
DB: db,
Resolver: resolver,
Actor: nobody,
},
},
{
"gc",
garbageCollectionInterval,
&gc,
},
} {
name := job.Name
interval := job.Interval
runner := job.Runner
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()

t := time.NewTicker(garbageCollectionInterval)
defer t.Stop()
t := time.NewTicker(interval)
defer t.Stop()

for {
log.Info("Collecting garbage")
if err := data.CollectGarbage(ctx, *domain, &cfg, db); err != nil {
log.Error("Failed to collect garbage", "error", err)
break
}
for {
log.Info("Running periodic job", "name", name)
if err := runner.Run(ctx); err != nil {
log.Error("Periodic job has failed", "name", name, "error", err)
break
}

select {
case <-ctx.Done():
return
select {
case <-ctx.Done():
return

case <-t.C:
case <-t.C:
}
}
}
}()
}()
}

<-ctx.Done()
log.Info("Shutting down")
Expand Down
32 changes: 19 additions & 13 deletions data/garbage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,51 +24,57 @@ import (
"time"
)

// CollectGarbage deletes old data.
func CollectGarbage(ctx context.Context, domain string, cfg *cfg.Config, db *sql.DB) error {
type GarbageCollector struct {
Domain string
Config *cfg.Config
DB *sql.DB
}

// Collect deletes old data.
func (gc *GarbageCollector) Run(ctx context.Context) error {
now := time.Now()

if _, err := db.ExecContext(ctx, `delete from notesfts where id in (select notes.id from notes left join follows on follows.followed in (notes.author, notes.cc0, notes.to0, notes.cc1, notes.to1, notes.cc2, notes.to2) or (notes.to2 is not null and exists (select 1 from json_each(notes.object->'to') where value = follows.followed)) or (notes.cc2 is not null and exists (select 1 from json_each(notes.object->'cc') where value = follows.followed)) where follows.accepted = 1 and notes.inserted < unixepoch()-60*60*24 and notes.host != ? and follows.id is null)`, domain); err != nil {
if _, err := gc.DB.ExecContext(ctx, `delete from notesfts where id in (select notes.id from notes left join follows on follows.followed in (notes.author, notes.cc0, notes.to0, notes.cc1, notes.to1, notes.cc2, notes.to2) or (notes.to2 is not null and exists (select 1 from json_each(notes.object->'to') where value = follows.followed)) or (notes.cc2 is not null and exists (select 1 from json_each(notes.object->'cc') where value = follows.followed)) where follows.accepted = 1 and notes.inserted < unixepoch()-60*60*24 and notes.host != ? and follows.id is null)`, gc.Domain); err != nil {
return fmt.Errorf("failed to remove invisible posts: %w", err)
}

if _, err := db.ExecContext(ctx, `delete from notes where id in (select notes.id from notes left join follows on follows.followed in (notes.author, notes.cc0, notes.to0, notes.cc1, notes.to1, notes.cc2, notes.to2) or (notes.to2 is not null and exists (select 1 from json_each(notes.object->'to') where value = follows.followed)) or (notes.cc2 is not null and exists (select 1 from json_each(notes.object->'cc') where value = follows.followed)) where follows.accepted = 1 and notes.inserted < unixepoch()-60*60*24 and notes.host != ? and follows.id is null)`, domain); err != nil {
if _, err := gc.DB.ExecContext(ctx, `delete from notes where id in (select notes.id from notes left join follows on follows.followed in (notes.author, notes.cc0, notes.to0, notes.cc1, notes.to1, notes.cc2, notes.to2) or (notes.to2 is not null and exists (select 1 from json_each(notes.object->'to') where value = follows.followed)) or (notes.cc2 is not null and exists (select 1 from json_each(notes.object->'cc') where value = follows.followed)) where follows.accepted = 1 and notes.inserted < unixepoch()-60*60*24 and notes.host != ? and follows.id is null)`, gc.Domain); err != nil {
return fmt.Errorf("failed to remove invisible posts: %w", err)
}

if _, err := db.ExecContext(ctx, `delete from notesfts where id in (select id from notes where inserted < unixepoch()-60*60*24*7 and author not in (select followed from follows where accepted = 1) and host != ?)`, domain); err != nil {
if _, err := gc.DB.ExecContext(ctx, `delete from notesfts where id in (select id from notes where inserted < unixepoch()-60*60*24*7 and author not in (select followed from follows where accepted = 1) and host != ?)`, gc.Domain); err != nil {
return fmt.Errorf("failed to remove posts by authors without followers: %w", err)
}

if _, err := db.ExecContext(ctx, `delete from notes where inserted < unixepoch()-60*60*24*7 and author not in (select followed from follows where accepted = 1) and host != ?`, domain); err != nil {
if _, err := gc.DB.ExecContext(ctx, `delete from notes where inserted < unixepoch()-60*60*24*7 and author not in (select followed from follows where accepted = 1) and host != ?`, gc.Domain); err != nil {
return fmt.Errorf("failed to remove posts by authors without followers: %w", err)
}

if _, err := db.ExecContext(ctx, `delete from notesfts where id in (select id from notes where inserted < ? and host != ?)`, now.Add(-cfg.NotesTTL).Unix(), domain); err != nil {
if _, err := gc.DB.ExecContext(ctx, `delete from notesfts where id in (select id from notes where inserted < ? and host != ?)`, now.Add(-gc.Config.NotesTTL).Unix(), gc.Domain); err != nil {
return fmt.Errorf("failed to remove old posts: %w", err)
}

if _, err := db.ExecContext(ctx, `delete from notes where inserted < ? and host != ?`, now.Add(-cfg.NotesTTL).Unix(), domain); err != nil {
if _, err := gc.DB.ExecContext(ctx, `delete from notes where inserted < ? and host != ?`, now.Add(-gc.Config.NotesTTL).Unix(), gc.Domain); err != nil {
return fmt.Errorf("failed to remove old posts: %w", err)
}

if _, err := db.ExecContext(ctx, `delete from hashtags where not exists (select 1 from notes where notes.id = hashtags.note)`); err != nil {
if _, err := gc.DB.ExecContext(ctx, `delete from hashtags where not exists (select 1 from notes where notes.id = hashtags.note)`); err != nil {
return fmt.Errorf("failed to remove old hashtags: %w", err)
}

if _, err := db.ExecContext(ctx, `delete from shares where not exists (select 1 from persons where persons.id = shares.by) or (inserted < ? and not exists (select 1 from notes where notes.id = shares.note))`, now.Add(-cfg.SharesTTL).Unix()); err != nil {
if _, err := gc.DB.ExecContext(ctx, `delete from shares where not exists (select 1 from persons where persons.id = shares.by) or (inserted < ? and not exists (select 1 from notes where notes.id = shares.note))`, now.Add(-gc.Config.SharesTTL).Unix()); err != nil {
return fmt.Errorf("failed to remove old shares: %w", err)
}

if _, err := db.ExecContext(ctx, `delete from outbox where inserted < ? and host != ?`, now.Add(-cfg.DeliveryTTL).Unix(), domain); err != nil {
if _, err := gc.DB.ExecContext(ctx, `delete from outbox where inserted < ? and host != ?`, now.Add(-gc.Config.DeliveryTTL).Unix(), gc.Domain); err != nil {
return fmt.Errorf("failed to remove old posts: %w", err)
}

if _, err := db.ExecContext(ctx, `delete from follows where accepted = 0 and inserted < ?`, now.Add(-cfg.FollowAcceptTimeout).Unix()); err != nil {
if _, err := gc.DB.ExecContext(ctx, `delete from follows where accepted = 0 and inserted < ?`, now.Add(-gc.Config.FollowAcceptTimeout).Unix()); err != nil {
return fmt.Errorf("failed to remove failed follow requests: %w", err)
}

if _, err := db.ExecContext(ctx, `delete from persons where updated < ? and host != ? and not exists (select 1 from follows where followed = persons.id) and not exists (select 1 from notes where notes.author = persons.id) and not exists (select 1 from shares where shares.by = persons.id)`, now.Add(-cfg.ActorTTL).Unix(), domain); err != nil {
if _, err := gc.DB.ExecContext(ctx, `delete from persons where updated < ? and host != ? and not exists (select 1 from follows where followed = persons.id) and not exists (select 1 from notes where notes.author = persons.id) and not exists (select 1 from shares where shares.by = persons.id)`, now.Add(-gc.Config.ActorTTL).Unix(), gc.Domain); err != nil {
return fmt.Errorf("failed to remove idle actors: %w", err)
}

Expand Down
40 changes: 24 additions & 16 deletions outbox/move.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,16 @@ import (
"log/slog"
)

func updatedMoveTargets(ctx context.Context, log *slog.Logger, db *sql.DB, resolver *fed.Resolver, from *ap.Actor, prefix string) error {
rows, err := db.QueryContext(ctx, `select oldid, newid from (select old.id as oldid, new.id as newid, old.updated as oldupdated from persons old join persons new on old.actor->>'movedTo' = new.id and not exists (select 1 from json_each(new.actor->'alsoKnownAs') where value = old.id) and old.updated > new.updated where old.actor->>'movedTo' is not null union select old.id, old.actor->>'movedTo', old.updated from persons old where old.actor->>'movedTo' is not null and not exists (select 1 from persons new where new.id = old.actor->>'movedTo')) where exists (select 1 from follows where followed = oldid and follower like ? and inserted < oldupdated)`, prefix)
type Mover struct {
Domain string
Log *slog.Logger
DB *sql.DB
Resolver *fed.Resolver
Actor *ap.Actor
}

func (m *Mover) updatedMoveTargets(ctx context.Context, prefix string) error {
rows, err := m.DB.QueryContext(ctx, `select oldid, newid from (select old.id as oldid, new.id as newid, old.updated as oldupdated from persons old join persons new on old.actor->>'movedTo' = new.id and not exists (select 1 from json_each(new.actor->'alsoKnownAs') where value = old.id) and old.updated > new.updated where old.actor->>'movedTo' is not null union select old.id, old.actor->>'movedTo', old.updated from persons old where old.actor->>'movedTo' is not null and not exists (select 1 from persons new where new.id = old.actor->>'movedTo')) where exists (select 1 from follows where followed = oldid and follower like ? and inserted < oldupdated)`, prefix)
if err != nil {
return fmt.Errorf("failed to moved actors: %w", err)
}
Expand All @@ -35,33 +43,33 @@ func updatedMoveTargets(ctx context.Context, log *slog.Logger, db *sql.DB, resol
for rows.Next() {
var oldID, newID string
if err := rows.Scan(&oldID, &newID); err != nil {
log.Error("Failed to scan moved actor", "error", err)
m.Log.Error("Failed to scan moved actor", "error", err)
continue
}

actor, err := resolver.Resolve(ctx, log, db, from, newID, false)
actor, err := m.Resolver.Resolve(ctx, m.Log, m.DB, m.Actor, newID, false)
if err != nil {
log.Warn("Failed to resolve move target", "old", oldID, "new", newID, "error", err)
m.Log.Warn("Failed to resolve move target", "old", oldID, "new", newID, "error", err)
continue
}

if !actor.AlsoKnownAs.Contains(oldID) {
log.Warn("New account does not point to old account", "new", newID, "old", oldID)
m.Log.Warn("New account does not point to old account", "new", newID, "old", oldID)
}
}

return nil
}

func Move(ctx context.Context, domain string, log *slog.Logger, db *sql.DB, resolver *fed.Resolver, from *ap.Actor) error {
prefix := fmt.Sprintf("https://%s/%%", domain)
func (m *Mover) Run(ctx context.Context) error {
prefix := fmt.Sprintf("https://%s/%%", m.Domain)

// updated new actor if old actor specifies movedTo but new actor doesn't specify old actor in alsoKnownAs
if err := updatedMoveTargets(ctx, log, db, resolver, from, prefix); err != nil {
if err := m.updatedMoveTargets(ctx, prefix); err != nil {
return err
}

rows, err := db.QueryContext(ctx, `select persons.actor, old.id, new.id, follows.id from persons old join persons new on old.actor->>'movedTo' = new.id and exists (select 1 from json_each(new.actor->'alsoKnownAs') where value = old.id) join follows on follows.followed = old.id and follows.inserted < old.updated join persons on persons.id = follows.follower where old.actor->>'movedTo' is not null and follows.follower like ?`, prefix)
rows, err := m.DB.QueryContext(ctx, `select persons.actor, old.id, new.id, follows.id from persons old join persons new on old.actor->>'movedTo' = new.id and exists (select 1 from json_each(new.actor->'alsoKnownAs') where value = old.id) join follows on follows.followed = old.id and follows.inserted < old.updated join persons on persons.id = follows.follower where old.actor->>'movedTo' is not null and follows.follower like ?`, prefix)
if err != nil {
return fmt.Errorf("failed to fetch follows to move: %w", err)
}
Expand All @@ -71,17 +79,17 @@ func Move(ctx context.Context, domain string, log *slog.Logger, db *sql.DB, reso
var actor ap.Actor
var oldID, newID, oldFollowID string
if err := rows.Scan(&actor, &oldID, &newID, &oldFollowID); err != nil {
log.Error("Failed to scan follow to move", "error", err)
m.Log.Error("Failed to scan follow to move", "error", err)
continue
}

log.Info("Moving follow", "follow", oldFollowID, "old", oldID, "new", newID)
if err := Follow(ctx, domain, &actor, newID, db); err != nil {
log.Warn("Failed to follow new actor", "follow", oldFollowID, "old", oldID, "new", newID, "error", err)
m.Log.Info("Moving follow", "follow", oldFollowID, "old", oldID, "new", newID)
if err := Follow(ctx, m.Domain, &actor, newID, m.DB); err != nil {
m.Log.Warn("Failed to follow new actor", "follow", oldFollowID, "old", oldID, "new", newID, "error", err)
continue
}
if err := Unfollow(ctx, domain, log, db, &actor, oldID, oldFollowID); err != nil {
log.Warn("Failed to unfollow old actor", "follow", oldFollowID, "old", oldID, "new", newID, "error", err)
if err := Unfollow(ctx, m.Domain, m.Log, m.DB, &actor, oldID, oldFollowID); err != nil {
m.Log.Warn("Failed to unfollow old actor", "follow", oldFollowID, "old", oldID, "new", newID, "error", err)
}
}

Expand Down
22 changes: 14 additions & 8 deletions outbox/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,18 @@ import (
"time"
)

type Poller struct {
Domain string
Log *slog.Logger
DB *sql.DB
}

type pollResult struct {
PollID, Option string
}

func UpdatePollResults(ctx context.Context, domain string, log *slog.Logger, db *sql.DB) error {
rows, err := db.QueryContext(ctx, `select poll, option, count(*) from (select polls.id as poll, votes.object->>'name' as option, votes.author as voter from notes polls join notes votes on votes.object->>'inReplyTo' = polls.id where polls.object->>'type' = 'Question' and polls.id like $1 and polls.object->>'closed' is null and votes.object->>'name' is not null group by poll, option, voter) group by poll, option`, fmt.Sprintf("https://%s/%%", domain))
func (p *Poller) Run(ctx context.Context) error {
rows, err := p.DB.QueryContext(ctx, `select poll, option, count(*) from (select polls.id as poll, votes.object->>'name' as option, votes.author as voter from notes polls join notes votes on votes.object->>'inReplyTo' = polls.id where polls.object->>'type' = 'Question' and polls.id like $1 and polls.object->>'closed' is null and votes.object->>'name' is not null group by poll, option, voter) group by poll, option`, fmt.Sprintf("https://%s/%%", p.Domain))
if err != nil {
return err
}
Expand All @@ -43,7 +49,7 @@ func UpdatePollResults(ctx context.Context, domain string, log *slog.Logger, db
var pollID, option string
var count int64
if err := rows.Scan(&pollID, &option, &count); err != nil {
log.Warn("Failed to scan poll result", "error", err)
p.Log.Warn("Failed to scan poll result", "error", err)
continue
}

Expand All @@ -53,8 +59,8 @@ func UpdatePollResults(ctx context.Context, domain string, log *slog.Logger, db
}

var obj ap.Object
if err := db.QueryRowContext(ctx, "select object from notes where id = ?", pollID).Scan(&obj); err != nil {
log.Warn("Failed to fetch poll", "poll", pollID, "error", err)
if err := p.DB.QueryRowContext(ctx, "select object from notes where id = ?", pollID).Scan(&obj); err != nil {
p.Log.Warn("Failed to fetch poll", "poll", pollID, "error", err)
continue
}

Expand Down Expand Up @@ -92,10 +98,10 @@ func UpdatePollResults(ctx context.Context, domain string, log *slog.Logger, db
continue
}

log.Info("Updating poll results", "poll", poll.ID)
p.Log.Info("Updating poll results", "poll", poll.ID)

if err := UpdateNote(ctx, domain, db, poll); err != nil {
log.Warn("Failed to update poll results", "poll", poll.ID, "error", err)
if err := UpdateNote(ctx, p.Domain, p.DB, poll); err != nil {
p.Log.Warn("Failed to update poll results", "poll", poll.ID, "error", err)
}
}

Expand Down
Loading

0 comments on commit de5ff6a

Please sign in to comment.