From d175e30fcdb07c59295062a0da1ab51defe7f2de Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsumoto Date: Sat, 2 Dec 2023 19:14:54 +0900 Subject: [PATCH] reconnect --- pool.go | 103 ++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 63 insertions(+), 40 deletions(-) diff --git a/pool.go b/pool.go index f10ea7c..8bb50ea 100644 --- a/pool.go +++ b/pool.go @@ -79,20 +79,19 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt ticker := time.NewTicker(seenAlreadyDropTick) eose := false + updateSince := func() { + // After reconnection, update the since in the filter so that + // old events are not retrieved. + now := Now() + for i := range filters { + filters[i].Since = &now + } + } + pending := xsync.NewCounter() pending.Add(int64(len(urls))) for _, url := range urls { go func(nm string) { - relay, err := pool.EnsureRelay(nm) - if err != nil { - return - } - - sub, _ := relay.Subscribe(ctx, filters) - if sub == nil { - return - } - defer func() { pending.Dec() if pending.Value() == 0 { @@ -103,41 +102,65 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt for { select { - case evt, more := <-sub.Events: - if !more { - return - } - if unique { - if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen { - continue - } - } + case <-ctx.Done(): + return + default: + } + + relay, err := pool.EnsureRelay(nm) + if err != nil { + time.Sleep(3 * time.Second) + updateSince() + continue + } + + sub, err := relay.Subscribe(ctx, filters) + if err != nil { + time.Sleep(3 * time.Second) + updateSince() + continue + } + + loop: + for { select { - case events <- IncomingEvent{Event: evt, Relay: relay}: - case <-ctx.Done(): - } - case <-sub.EndOfStoredEvents: - eose = true - case <-ticker.C: - if eose { - del := map[string]struct{}{} - old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix()) - seenAlready.Range(func(id string, value Timestamp) bool { - if value < old { - del[id] = struct{}{} + case evt, more := <-sub.Events: + if !more { + break loop + } + if unique { + if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen { + continue } - return true - }) - for k := range del { - seenAlready.Delete(k) } + select { + case events <- IncomingEvent{Event: evt, Relay: relay}: + case <-ctx.Done(): + } + case <-sub.EndOfStoredEvents: + eose = true + case <-ticker.C: + if eose { + del := map[string]struct{}{} + old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix()) + seenAlready.Range(func(id string, value Timestamp) bool { + if value < old { + del[id] = struct{}{} + } + return true + }) + for k := range del { + seenAlready.Delete(k) + } + } + case reason := <-sub.ClosedReason: + log.Printf("CLOSED from %s: '%s'\n", nm, reason) + return + case <-ctx.Done(): + return } - case reason := <-sub.ClosedReason: - log.Printf("CLOSED from %s: '%s'\n", nm, reason) - return - case <-ctx.Done(): - return } + updateSince() } }(NormalizeURL(url)) }