Skip to content

Commit

Permalink
reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
mattn committed Dec 2, 2023
1 parent 2b9b4df commit d175e30
Showing 1 changed file with 63 additions and 40 deletions.
103 changes: 63 additions & 40 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,19 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt
ticker := time.NewTicker(seenAlreadyDropTick)
eose := false

updateSince := func() {
// After reconnection, update the since in the filter so that
// old events are not retrieved.
now := Now()
for i := range filters {
filters[i].Since = &now
}
}

pending := xsync.NewCounter()
pending.Add(int64(len(urls)))
for _, url := range urls {
go func(nm string) {
relay, err := pool.EnsureRelay(nm)
if err != nil {
return
}

sub, _ := relay.Subscribe(ctx, filters)
if sub == nil {
return
}

defer func() {
pending.Dec()
if pending.Value() == 0 {
Expand All @@ -103,41 +102,65 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt

for {
select {
case evt, more := <-sub.Events:
if !more {
return
}
if unique {
if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen {
continue
}
}
case <-ctx.Done():
return
default:
}

relay, err := pool.EnsureRelay(nm)
if err != nil {
time.Sleep(3 * time.Second)
updateSince()
continue
}

sub, err := relay.Subscribe(ctx, filters)
if err != nil {
time.Sleep(3 * time.Second)
updateSince()
continue
}

loop:
for {
select {
case events <- IncomingEvent{Event: evt, Relay: relay}:
case <-ctx.Done():
}
case <-sub.EndOfStoredEvents:
eose = true
case <-ticker.C:
if eose {
del := map[string]struct{}{}
old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix())
seenAlready.Range(func(id string, value Timestamp) bool {
if value < old {
del[id] = struct{}{}
case evt, more := <-sub.Events:
if !more {
break loop
}
if unique {
if _, seen := seenAlready.LoadOrStore(evt.ID, evt.CreatedAt); seen {
continue
}
return true
})
for k := range del {
seenAlready.Delete(k)
}
select {
case events <- IncomingEvent{Event: evt, Relay: relay}:
case <-ctx.Done():
}
case <-sub.EndOfStoredEvents:
eose = true
case <-ticker.C:
if eose {
del := map[string]struct{}{}
old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix())
seenAlready.Range(func(id string, value Timestamp) bool {
if value < old {
del[id] = struct{}{}
}
return true
})
for k := range del {
seenAlready.Delete(k)
}
}
case reason := <-sub.ClosedReason:
log.Printf("CLOSED from %s: '%s'\n", nm, reason)
return
case <-ctx.Done():
return
}
case reason := <-sub.ClosedReason:
log.Printf("CLOSED from %s: '%s'\n", nm, reason)
return
case <-ctx.Done():
return
}
updateSince()
}
}(NormalizeURL(url))
}
Expand Down

0 comments on commit d175e30

Please sign in to comment.