diff --git a/pool.go b/pool.go index f10ea7c..bd38fe7 100644 --- a/pool.go +++ b/pool.go @@ -79,20 +79,19 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt ticker := time.NewTicker(seenAlreadyDropTick) eose := false + updateSince := func() { + // After reconnection, update the since in the filter so that + // old events are not retrieved. + now := Now() + for i := range filters { + filters[i].Since = &now + } + } + pending := xsync.NewCounter() pending.Add(int64(len(urls))) for _, url := range urls { go func(nm string) { - relay, err := pool.EnsureRelay(nm) - if err != nil { - return - } - - sub, _ := relay.Subscribe(ctx, filters) - if sub == nil { - return - } - defer func() { pending.Dec() if pending.Value() == 0 { @@ -102,6 +101,26 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt }() for { + select { + case <-ctx.Done(): + return + default: + } + + relay, err := pool.EnsureRelay(nm) + if err != nil { + time.Sleep(3 * time.Second) + updateSince() + continue + } + + sub, err := relay.Subscribe(ctx, filters) + if err != nil { + time.Sleep(3 * time.Second) + updateSince() + continue + } + select { case evt, more := <-sub.Events: if !more { @@ -138,6 +157,7 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt case <-ctx.Done(): return } + updateSince() } }(NormalizeURL(url)) }