Skip to content

Commit

Permalink
remove last remaining direct json.Unmarshal() call during delivery
Browse files Browse the repository at this point in the history
  • Loading branch information
dimkr committed Jan 27, 2024
1 parent a09291a commit c2ab88a
Showing 1 changed file with 5 additions and 11 deletions.
16 changes: 5 additions & 11 deletions fed/deliver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package fed
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"github.com/dimkr/tootik/ap"
Expand Down Expand Up @@ -58,35 +57,30 @@ func (q *Queue) Process(ctx context.Context) error {
func (q *Queue) process(ctx context.Context) error {
q.Log.Debug("Polling delivery queue")

rows, err := q.DB.QueryContext(ctx, `select outbox.attempts, outbox.activity, outbox.inserted, outbox.received, persons.actor from outbox join persons on persons.id = outbox.sender where outbox.sent = 0 and (outbox.attempts = 0 or (outbox.attempts < ? and outbox.last <= unixepoch() - ?)) order by outbox.attempts asc, outbox.last asc limit ?`, q.Config.MaxDeliveryAttempts, q.Config.DeliveryRetryInterval, q.Config.DeliveryBatchSize)
rows, err := q.DB.QueryContext(ctx, `select outbox.attempts, outbox.activity, outbox.activity, outbox.inserted, outbox.received, persons.actor from outbox join persons on persons.id = outbox.sender where outbox.sent = 0 and (outbox.attempts = 0 or (outbox.attempts < ? and outbox.last <= unixepoch() - ?)) order by outbox.attempts asc, outbox.last asc limit ?`, q.Config.MaxDeliveryAttempts, q.Config.DeliveryRetryInterval, q.Config.DeliveryBatchSize)
if err != nil {
return fmt.Errorf("failed to fetch posts to deliver: %w", err)
}
defer rows.Close()

for rows.Next() {
var activityString string
var activity ap.Activity
var rawActivity string
var actor ap.Actor
var inserted int64
var recipients ap.Audience
var deliveryAttempts int
if err := rows.Scan(&deliveryAttempts, &activityString, &inserted, &recipients, &actor); err != nil {
if err := rows.Scan(&deliveryAttempts, &activity, &rawActivity, &inserted, &recipients, &actor); err != nil {
q.Log.Error("Failed to fetch post to deliver", "error", err)
continue
}

var activity ap.Activity
if err := json.Unmarshal([]byte(activityString), &activity); err != nil {
q.Log.Error("Failed to unmarshal undelivered activity", "attempts", deliveryAttempts, "error", err)
continue
}

if _, err := q.DB.ExecContext(ctx, `update outbox set last = unixepoch(), attempts = ? where activity->>'id' = ?`, deliveryAttempts+1, activity.ID); err != nil {
q.Log.Error("Failed to save last delivery attempt time", "id", activity.ID, "attempts", deliveryAttempts, "error", err)
continue
}

if err := q.deliverWithTimeout(ctx, &activity, []byte(activityString), &actor, time.Unix(inserted, 0), &recipients); err != nil {
if err := q.deliverWithTimeout(ctx, &activity, []byte(rawActivity), &actor, time.Unix(inserted, 0), &recipients); err != nil {
q.Log.Warn("Failed to deliver activity", "id", activity.ID, "attempts", deliveryAttempts, "error", err)
continue
}
Expand Down

0 comments on commit c2ab88a

Please sign in to comment.