Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribing to multiple events causes several issues #4

Open
runeerle42 opened this issue Apr 1, 2022 · 6 comments
Open

Subscribing to multiple events causes several issues #4

runeerle42 opened this issue Apr 1, 2022 · 6 comments

Comments

@runeerle42
Copy link

Hi, I'm using go-modemmanager to handle outgoing and incoming SMS and voice calls.

I have two issues that I don't know why occur. Both are related to subscribing to events. The first is that subscribing to several different event types causes all channels to receive all events. The second is that unsubscribing leaks lots of memory.
Here is a code example with only the important parts:

voice, err := modem.GetVoice()
messageHandle, err := modem.GetMessaging()

voiceDbus := voice.SubscribeCallAdded()
messageDbus := messageHandle.SubscribeAdded()

go func() {
	for true {
		select {
		case dbusVoice := <-voiceDbus:
			log.Printf("case dbusVoice: %v\n", dbusVoice)
		case dbusMessage := <-messageDbus:
			log.Printf("case dbusMessage: %v\n", dbusMessage)
		}
	}
}
...
smsSend, errSMS := messageHandle.CreateSms(selfNumber, sendString) // Send SMS to myself
smsDbus := smsSend.SubscribePropertiesChanged()
go func() {
	for true {
		select {
		case dbusSMS := <-smsDbus:
			log.Printf("case dbusSMS: %v\n", dbusSMS)
		case <-time.After(time.Minute * 1):
			smsSend.Unsubscribe()
			return
		}
	}
}
// The outgoing SMS
smsSend.Send()

First issue:

// Output:
case dbusVoice: &{:1.6207 /org/freedesktop/ModemManager1/Modem/0 org.freedesktop.ModemManager1.Modem.Messaging.Added [/org/freedesktop/ModemManager1/SMS/688 false]}
case dbusMessage: &{:1.6207 /org/freedesktop/ModemManager1/Modem/0 org.freedesktop.ModemManager1.Modem.Messaging.Added [/org/freedesktop/ModemManager1/SMS/688 false]}

case dbusSMS: &{:1.6207 /org/freedesktop/ModemManager1/SMS/688 org.freedesktop.DBus.Properties.PropertiesChanged [org.freedesktop.ModemManager1.Sms map[MessageReference:@u 148 State:@u 5] []]}
case dbusVoice: &{:1.6207 /org/freedesktop/ModemManager1/SMS/688 org.freedesktop.DBus.Properties.PropertiesChanged [org.freedesktop.ModemManager1.Sms map[MessageReference:@u 148 State:@u 5] []]}
case dbusMessage: &{:1.6207 /org/freedesktop/ModemManager1/SMS/688 org.freedesktop.DBus.Properties.PropertiesChanged [org.freedesktop.ModemManager1.Sms map[MessageReference:@u 148 State:@u 5] []]}

// The same SMS returning:
case dbusSMS: &{:1.6207 /org/freedesktop/ModemManager1/Modem/0 org.freedesktop.ModemManager1.Modem.Messaging.Added [/org/freedesktop/ModemManager1/SMS/689 true]}
case dbusVoice: &{:1.6207 /org/freedesktop/ModemManager1/Modem/0 org.freedesktop.ModemManager1.Modem.Messaging.Added [/org/freedesktop/ModemManager1/SMS/689 true]}
case dbusMessage: &{:1.6207 /org/freedesktop/ModemManager1/Modem/0 org.freedesktop.ModemManager1.Modem.Messaging.Added [/org/freedesktop/ModemManager1/SMS/689 true]}

Second and main issue:
There are more and more hanging signals that never quit. It only happens after smsSend.Unsubscribe() is called.

//Wait for the listener to go away
time.Sleep(time.Second * 65)

// Send messages and print all running stack traces:
for true {
	smsSend, errSMS := messageHandle.CreateSms(selfNumber, sendString) // Send SMS to myself
	smsSend.Send()
	time.Sleep(time.Second * 30)
	buf := make([]byte, 1<<16)
	runtime.Stack(buf, true)
	log.Printf("Stack trace:\n%v", string(buf))
}

After ~10 messages the number of hanging goroutines starts to grow and all hang in "deferredDeliver":

// Output:
Stack trace:
goroutine 258 [select, 1 minutes]:
github.com/godbus/dbus/v5.(*signalChannelData).deferredDeliver(0x4000624b60, 0x40001e2550)
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:318 +0x88
created by github.com/godbus/dbus/v5.(*signalChannelData).deliver
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:313 +0xf8

goroutine 188 [select, 2 minutes]:
github.com/godbus/dbus/v5.(*signalChannelData).deferredDeliver(0x4000624b60, 0x40005b4280)
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:318 +0x88
created by github.com/godbus/dbus/v5.(*signalChannelData).deliver
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:313 +0xf8

goroutine 158 [select, 2 minutes]:
github.com/godbus/dbus/v5.(*signalChannelData).deferredDeliver(0x4000624b60, 0x40001e2230)
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:318 +0x88
created by github.com/godbus/dbus/v5.(*signalChannelData).deliver
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:313 +0xf8

goroutine 195 [select, 1 minutes]:
github.com/godbus/dbus/v5.(*signalChannelData).deferredDeliver(0x4000624b60, 0x40005b41e0)
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:318 +0x88
created by github.com/godbus/dbus/v5.(*signalChannelData).deliver
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:313 +0xf8

goroutine 168 [select, 2 minutes]:
github.com/godbus/dbus/v5.(*signalChannelData).deferredDeliver(0x4000624b60, 0x400017e0f0)
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:318 +0x88
created by github.com/godbus/dbus/v5.(*signalChannelData).deliver
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:313 +0xf8

goroutine 247 [select]:
github.com/godbus/dbus/v5.(*signalChannelData).deferredDeliver(0x4000624b60, 0x40001e25a0)
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:318 +0x88
created by github.com/godbus/dbus/v5.(*signalChannelData).deliver
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:313 +0xf8

goroutine 174 [select, 2 minutes]:
github.com/godbus/dbus/v5.(*signalChannelData).deferredDeliver(0x4000624b60, 0x400017e3c0)
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:318 +0x88
created by github.com/godbus/dbus/v5.(*signalChannelData).deliver
	/home/.../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:313 +0xf8

This crashes my program with OOM since a new SubscribePropertiesChanged/Unsubscribe() pair was made for each message. I believe it uses exponentially more memory because Unsubscribe() fails to trigger close() on the channel. That is, the smsDbus channel never closes as a separate test show that it still receives events even after Unsubscribe() is called.

I'm not sure if I do something wrong or if the error is somewhere else. Hope you can help me :)

Regards
Rune

@maltegrosse
Copy link
Owner

Hi @runeerle42 ,
can you try to explain me what you want to achieve? I am not an expert in channels/signal etc, but have you seen my examples folder?
https://github.com/maltegrosse/go-modemmanager/blob/master/examples/test_listener.go#L55

regarding your sms loop, have you tried something like

ticker := time.NewTicker(5 * time.Second)
quit := make(chan struct{})
go func() {
    for {
       select {
        case <- ticker.C:
            // do stuff
        case <- quit:
            ticker.Stop()
            return
        }
    }
 }()

see https://stackoverflow.com/a/16466581/15214926

@runeerle42
Copy link
Author

Hi,
I'm making unit test that will run 24/7 both sending and receiving calls/SMS/MMS every minute, so 1000's every day each. And for each I need timestamps for every state transition. For now I have swapped to use loops that just call GetState() for each SMS and voice call that is made. But using the SubscribePropertiesChanged() is listed as the intended method to detect changes, and it must be done for every single SMS/call as they have unique DBus events. I then log every event with detailed info. Might even need to log all AT commands and syslog/kernel messages with the modemManager service in debug mode for more tricky issues.

https://github.com/maltegrosse/go-modemmanager/blob/master/examples/test_listener.go#L55

For the examples, none of them use Unsubscribe() or multiple different Subscribe...() at the same time. When there is just one at the same time there seems to be no errors.

regarding your sms loop, have you tried something like

I simplified the example code quite a lot. My real SMS and call loops use tickers like your example already :)

But the issue is that calling Unsubscribe() for one specific event will only stop DBus events of that specific type, and listening to changes for any SMS is not possible as each is has its own DBujs event name '/org/freedesktop/ModemManager1/SMS/688'. So I call smsSend.SubscribePropertiesChanged() for every SMS, and get a channel back. This channel is not closed by the matching Unsubscribe().

see https://stackoverflow.com/a/16466581/15214926

Its not possible for me to close this channel either since close(smsDbus) is illegal, and must be done on the other side of the channel.

Here is what I think might be wrong without debugging it directly:

// ../go/pkg/mod/github.com/maltegrosse/[email protected]/Sms.go:
func (ss sms) Unsubscribe() {
	ss.conn.RemoveSignal(ss.sigChan)
	ss.sigChan = nil  // <------------- This channel is not closed
}
// ../go/pkg/mod/github.com/godbus/dbus/[email protected]/default_handler.go:
func (sh *defaultSignalHandler) RemoveSignal(ch chan<- *Signal) {
	sh.mu.Lock()
	defer sh.mu.Unlock()
	if sh.closed {
		return
	}
	for i := len(sh.signals) - 1; i >= 0; i-- {
		if ch == sh.signals[i].ch {
			sh.signals[i].close() //  <------------- This is never called?
			copy(sh.signals[i:], sh.signals[i+1:])
			sh.signals[len(sh.signals)-1] = nil
			sh.signals = sh.signals[:len(sh.signals)-1]
		}
	}
}

I don't see why this logic is failing, but perhaps it is related to events triggering on unrelated channels?
So Subscribe + Unsubscribe leaves the channel returned by Subscribe open, and still receiving unrelated events.

Regards
Rune

@maltegrosse
Copy link
Owner

Hi @runeerle42 ,

as wrote before, I have not used these functions really - and not really expert in this area.
I got highly inspired by network manager implementation. https://github.com/Wifx/gonetworkmanager/blob/master/NetworkManager.go#L611

Have you tried any upstream (go)dbus/golang version?
godbus/dbus#271
Multiple of these channels can be registered at the same time. The channel is closed if the Conn is closed; it should not be closed by the caller before RemoveSignal was called on it.

If you have any idea how to fix this - I am open for any PR :)

@runeerle42
Copy link
Author

Hi @maltegrosse ,

I have tested the latest https://github.com/godbus/dbus/ @5.1.0 now and there is no change.
Looking more on the code I think I know what is wrong.
This causes all signals to be sent to all channels:
https://github.com/maltegrosse/go-modemmanager/blob/master/Sms.go

func (ss sms) SubscribePropertiesChanged() <-chan *dbus.Signal {
	if ss.sigChan != nil {
		return ss.sigChan
	}
	rule := fmt.Sprintf("type='signal', member='%s',path_namespace='%s'", dbusPropertiesChanged, fmt.Sprint(ss.GetObjectPath()))
	ss.conn.BusObject().Call(dbusMethodAddMatch, 0, rule) // <---- The ss.conn is shared among all interfaces
	ss.sigChan = make(chan *dbus.Signal, 10)
	ss.conn.Signal(ss.sigChan) // Adds an extra signal receiver to the shared dbus connection
	return ss.sigChan
}

When a modem is created with GetModems() the modem is initialized with this code. And this makes the single shared DBus signaling interface:
https://github.com/maltegrosse/go-modemmanager/blob/master/utils.go

func (d *dbusBase) init(iface string, objectPath dbus.ObjectPath) error {
	var err error

	d.conn, err = dbus.SystemBus()
...

https://github.com/godbus/dbus/blob/v5.1.0/conn.go

// SystemBus returns a shared connection to the system bus, connecting to it if
// not already done.
func SystemBus() (conn *Conn, err error) {
...

So each instance of Subscribe() need to have a private conn object. In addition each DBus event that must be listened to need a private connection. I think this is done by replacing 'ss.conn.BusObject().Call(dbusMethodAddMatch, 0, rule)' with this function, or a similar function that makes a private connection:
https://github.com/maltegrosse/go-modemmanager/blob/master/Sms.go


func (ss sms) SubscribePropertiesChanged() <-chan *dbus.Signal {
	ss.mu.Lock()
	defer ss.mu.Unlock() // Thread safety only
	if ss.sigChan != nil {
		return ss.sigChan
	}
	rule := fmt.Sprintf("type='signal', member='%s',path_namespace='%s'", dbusPropertiesChanged, fmt.Sprint(ss.GetObjectPath()))
	if ss.conn == nil {
		conn, err := ss.connGlobalFromInit.Dial(rule) // Get a new connection for this object only? Or perhaps use Connect() ? 
		ss.conn = conn
	}
	//ss.conn.Dial(rule) // Or something like this, or perhaps use "SystemBusPrivateHandler"?
	ss.sigChan = make(chan *dbus.Signal, 10)
	ss.conn.Signal(ss.sigChan)
	return ss.sigChan
}

https://github.com/godbus/dbus/blob/v5.1.0/conn.go

// Dial establishes a new private connection to the message bus specified by address.
func Dial(address string, opts ...ConnOption) (*Conn, error) {
	tr, err := getTransport(address)
	if err != nil {
		return nil, err
	}
	return newConn(tr, opts...)
}

Hope this helps understanding what is wrong and a possible fix. Unfortunately I don't have time (or know how) to make a PR for this now.

Regards
Rune

@maltegrosse
Copy link
Owner

maltegrosse commented Apr 4, 2022

@runeerle42 thank you for debugging this issue. time is limited on my side too...
but I can offer you that you send me over fixes over via email [email protected] and I try to make a PR - and you can test it upstream. Thank you for your efforts.

@damonto
Copy link

damonto commented Sep 15, 2023

I have the same problem. I solved it using dbus.SystemBusPrivate

Here is the sample code:

dbusConn, err := m.systemBusPrivate()
	if err != nil {
		return err
	}
	rule := fmt.Sprintf("type='signal', member='%s',path_namespace='%s'", modemmanager.ModemMessagingSignalAdded, fmt.Sprint(modem.GetObjectPath()))
	dbusConn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, rule)
	sigChan := make(chan *dbus.Signal, 10)
	dbusConn.Signal(sigChan)
func (m *modem) systemBusPrivate() (*dbus.Conn, error) {
	dbusConn, err := dbus.SystemBusPrivate()
	if err != nil {
		return nil, err
	}

	err = dbusConn.Auth(nil)
	if err != nil {
		dbusConn.Close()
		return nil, err
	}

	err = dbusConn.Hello()
	if err != nil {
		dbusConn.Close()
		return nil, err
	}

	return dbusConn, nil
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants