diff --git a/helpers.go b/helpers.go index 5504673..aac13a3 100644 --- a/helpers.go +++ b/helpers.go @@ -1,6 +1,8 @@ package nostr import ( + "strconv" + "strings" "sync" "unsafe" @@ -92,3 +94,8 @@ func arePointerValuesEqual[V comparable](a *V, b *V) bool { } return false } + +func subIdToSerial(subId string) int64 { + serialId, _ := strconv.ParseInt(subId[0:strings.Index(subId, ":")], 10, 64) + return serialId +} diff --git a/relay.go b/relay.go index 7615b75..f47d648 100644 --- a/relay.go +++ b/relay.go @@ -18,7 +18,7 @@ import ( type Status int -var subscriptionIDCounter atomic.Int32 +var subscriptionIDCounter atomic.Int64 type Relay struct { closeMutex sync.Mutex @@ -27,7 +27,7 @@ type Relay struct { RequestHeader http.Header // e.g. for origin header Connection *Connection - Subscriptions *xsync.MapOf[string, *Subscription] + Subscriptions *xsync.MapOf[int64, *Subscription] ConnectionError error connectionContext context.Context // will be canceled when the connection closes @@ -57,7 +57,7 @@ func NewRelay(ctx context.Context, url string, opts ...RelayOption) *Relay { URL: NormalizeURL(url), connectionContext: ctx, connectionContextCancel: cancel, - Subscriptions: xsync.NewMapOf[string, *Subscription](), + Subscriptions: xsync.NewMapOf[int64, *Subscription](), okCallbacks: xsync.NewMapOf[string, func(bool, string)](), writeQueue: make(chan writeRequest), subscriptionChannelCloseQueue: make(chan *Subscription), @@ -171,10 +171,9 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error r.Connection = nil // close all subscriptions - r.Subscriptions.Range(func(_ string, sub *Subscription) bool { - go sub.Unsub() - return true - }) + for _, sub := range r.Subscriptions.Range { + sub.Unsub() + } }() // queue all write operations here so we don't do mutex spaghetti @@ -241,7 +240,8 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error if env.SubscriptionID == nil { continue } - if subscription, ok := r.Subscriptions.Load(*env.SubscriptionID); !ok { + + if subscription, ok := r.Subscriptions.Load(subIdToSerial(*env.SubscriptionID)); !ok { // InfoLogger.Printf("{%s} no subscription with id '%s'\n", r.URL, *env.SubscriptionID) continue } else { @@ -263,15 +263,15 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error subscription.dispatchEvent(&env.Event) } case *EOSEEnvelope: - if subscription, ok := r.Subscriptions.Load(string(*env)); ok { + if subscription, ok := r.Subscriptions.Load(subIdToSerial(string(*env))); ok { subscription.dispatchEose() } case *ClosedEnvelope: - if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok { + if subscription, ok := r.Subscriptions.Load(subIdToSerial(env.SubscriptionID)); ok { subscription.dispatchClosed(env.Reason) } case *CountEnvelope: - if subscription, ok := r.Subscriptions.Load(string(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil { + if subscription, ok := r.Subscriptions.Load(subIdToSerial(env.SubscriptionID)); ok && env.Count != nil && subscription.countResult != nil { subscription.countResult <- *env.Count } case *OKEnvelope: @@ -400,7 +400,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts . Relay: r, Context: ctx, cancel: cancel, - counter: int(current), + counter: current, Events: make(chan *Event), EndOfStoredEvents: make(chan struct{}, 1), ClosedReason: make(chan string, 1), @@ -415,8 +415,7 @@ func (r *Relay) PrepareSubscription(ctx context.Context, filters Filters, opts . } } - id := sub.GetID() - r.Subscriptions.Store(id, sub) + r.Subscriptions.Store(int64(sub.counter), sub) // start handling events, eose, unsub etc: go sub.start() @@ -514,3 +513,7 @@ func (r *Relay) Close() error { return nil } + +var subIdPool = sync.Pool{ + New: func() any { return make([]byte, 0, 15) }, +} diff --git a/subscription.go b/subscription.go index 86fdc53..9ccd406 100644 --- a/subscription.go +++ b/subscription.go @@ -10,7 +10,7 @@ import ( type Subscription struct { label string - counter int + counter int64 Relay *Relay Filters Filters @@ -65,7 +65,12 @@ var _ SubscriptionOption = (WithLabel)("") // GetID return the Nostr subscription ID as given to the Relay // it is a concatenation of the label and a serial number. func (sub *Subscription) GetID() string { - return sub.label + ":" + strconv.Itoa(sub.counter) + buf := subIdPool.Get().([]byte) + buf = strconv.AppendInt(buf, sub.counter, 10) + buf = append(buf, ':') + buf = append(buf, sub.label...) + defer subIdPool.Put(buf) + return string(buf) } func (sub *Subscription) start() { @@ -133,7 +138,7 @@ func (sub *Subscription) Unsub() { } // remove subscription from our map - sub.Relay.Subscriptions.Delete(sub.GetID()) + sub.Relay.Subscriptions.Delete(sub.counter) } // Close just sends a CLOSE message. You probably want Unsub() instead.