From c2ab88ae5ed339c93fb30140af5a4dae06056880 Mon Sep 17 00:00:00 2001 From: Dima Krasner Date: Sat, 27 Jan 2024 12:14:36 +0200 Subject: [PATCH] remove last remaining direct json.Unmarshal() call during delivery --- fed/deliver.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/fed/deliver.go b/fed/deliver.go index ee2707c0..e98e3971 100644 --- a/fed/deliver.go +++ b/fed/deliver.go @@ -19,7 +19,6 @@ package fed import ( "context" "database/sql" - "encoding/json" "errors" "fmt" "github.com/dimkr/tootik/ap" @@ -58,35 +57,30 @@ func (q *Queue) Process(ctx context.Context) error { func (q *Queue) process(ctx context.Context) error { q.Log.Debug("Polling delivery queue") - rows, err := q.DB.QueryContext(ctx, `select outbox.attempts, outbox.activity, outbox.inserted, outbox.received, persons.actor from outbox join persons on persons.id = outbox.sender where outbox.sent = 0 and (outbox.attempts = 0 or (outbox.attempts < ? and outbox.last <= unixepoch() - ?)) order by outbox.attempts asc, outbox.last asc limit ?`, q.Config.MaxDeliveryAttempts, q.Config.DeliveryRetryInterval, q.Config.DeliveryBatchSize) + rows, err := q.DB.QueryContext(ctx, `select outbox.attempts, outbox.activity, outbox.activity, outbox.inserted, outbox.received, persons.actor from outbox join persons on persons.id = outbox.sender where outbox.sent = 0 and (outbox.attempts = 0 or (outbox.attempts < ? and outbox.last <= unixepoch() - ?)) order by outbox.attempts asc, outbox.last asc limit ?`, q.Config.MaxDeliveryAttempts, q.Config.DeliveryRetryInterval, q.Config.DeliveryBatchSize) if err != nil { return fmt.Errorf("failed to fetch posts to deliver: %w", err) } defer rows.Close() for rows.Next() { - var activityString string + var activity ap.Activity + var rawActivity string var actor ap.Actor var inserted int64 var recipients ap.Audience var deliveryAttempts int - if err := rows.Scan(&deliveryAttempts, &activityString, &inserted, &recipients, &actor); err != nil { + if err := rows.Scan(&deliveryAttempts, &activity, &rawActivity, &inserted, &recipients, &actor); err != nil { q.Log.Error("Failed to fetch post to deliver", "error", err) continue } - var activity ap.Activity - if err := json.Unmarshal([]byte(activityString), &activity); err != nil { - q.Log.Error("Failed to unmarshal undelivered activity", "attempts", deliveryAttempts, "error", err) - continue - } - if _, err := q.DB.ExecContext(ctx, `update outbox set last = unixepoch(), attempts = ? where activity->>'id' = ?`, deliveryAttempts+1, activity.ID); err != nil { q.Log.Error("Failed to save last delivery attempt time", "id", activity.ID, "attempts", deliveryAttempts, "error", err) continue } - if err := q.deliverWithTimeout(ctx, &activity, []byte(activityString), &actor, time.Unix(inserted, 0), &recipients); err != nil { + if err := q.deliverWithTimeout(ctx, &activity, []byte(rawActivity), &actor, time.Unix(inserted, 0), &recipients); err != nil { q.Log.Warn("Failed to deliver activity", "id", activity.ID, "attempts", deliveryAttempts, "error", err) continue }