Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

invalid memory address or nil pointer dereference on dequeue #22

Open
dllz opened this issue Mar 18, 2020 · 10 comments
Open

invalid memory address or nil pointer dereference on dequeue #22

dllz opened this issue Mar 18, 2020 · 10 comments

Comments

@dllz
Copy link

dllz commented Mar 18, 2020

Hey, just started using this library and I have run into an issue dequeuing a queue.

[signal SIGSEGV: segmentation violation code=0x1 addr=0x58 pc=0x181f8b1]

goroutine 10 [running]:
sync.(*Mutex).Lock(...)
	/usr/local/Cellar/go/1.13.6/libexec/src/sync/mutex.go:74
github.com/joncrlsn/dque.(*DQue).Dequeue(0x0, 0x0, 0x0, 0x0, 0x0)
	/Users/dlotz/go/pkg/mod/github.com/joncrlsn/[email protected]/queue.go:240 +0x51

I am not sure if you have any thoughts on the cause of this, my googling does not seem to turn anything up.  
@joncrlsn
Copy link
Owner

joncrlsn commented Mar 19, 2020

Hi, is this something you can recreate at will? That would help.

@Kriechi

@dllz
Copy link
Author

dllz commented Mar 19, 2020

I can only get this error, I do not seem able to successfully dequeue.
The program starts up, all the queues are created.
Then I have set up a blocking pop on the queue along the lines of:

Try dequeue, if nothing in the queue sleep for 0.5 seconds and then try dequeue again. Repeat till you can get something off the queue.

And I immediately get that error and the program crashes

@joncrlsn
Copy link
Owner

joncrlsn commented Mar 19, 2020

Until we get this figured out, can you use the v2.1 tag? I'm at work today and won't be able to try until later this evening.

@dllz
Copy link
Author

dllz commented Mar 19, 2020

No problem, if it helps debuging I am using go 1.14 but I downgraded to 1.12 and the issue persisted

@Kriechi
Copy link
Contributor

Kriechi commented Mar 19, 2020

@dllz would you mind sharing a snippet of your code?
How do you open the queue? Maybe you even got a minimal reproducible example that shows the error? Or is it deep-down in your application?

@dllz
Copy link
Author

dllz commented Mar 19, 2020

Happy to share some, I tried a bunch of other releases and I ran into the issue so I think this is very much a me being stupid.

type Dq struct {
	low  *dque.DQue
	med  *dque.DQue
	high *dque.DQue
	now  *dque.DQue
}

type Holder struct {
	Item interface{}
	Time int64
	Id   string
}

func HolderBuilder() interface{} {
	return &Holder{}
}

func (d Dq) Push(id string, item interface{}, priority Priority) error {
	holder := Holder{
		Item: item,
		Time: time.Now().Unix(),
	}
	switch priority {
	case Now:
		err := d.now.Enqueue(holder)
		if err != nil {
			util.HandleError(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	case High:
		err := d.high.Enqueue(holder)
		if err != nil {
			util.HandleError(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	case Medium:
		err := d.med.Enqueue(holder)
		if err != nil {
			util.HandleError(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	case Low:
		err := d.low.Enqueue(holder)
		if err != nil {
			util.HandleError(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	default:
		return util.HandleErrorErr("No matching priority", errors.New("No matching priority for "+string(priority)))
	}
	return nil
}

func (d Dq) Pop() interface{} {
	for {
		temp, err := d.now.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			util.HandleError("Error fetching now priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		temp, err = d.high.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			util.HandleError("Error fetching high priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		temp, err = d.med.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			util.HandleError("Error fetching med priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		temp, err = d.low.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			util.HandleError("Error fetching low priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		time.Sleep(time.Millisecond * time.Duration(viper.GetInt(util.QueueDequeueEmptyWaitTime)))
	}
}

func (d Dq) Connect() interface{} {
	var err error
	d.low, err = dque.New(string(Low), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
	if err != nil {
		d.now, err = dque.Open(string(Low), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
		if err != nil {
			util.HandleError("Error creating low client", err)
			return nil
		}
	}
	d.med, err = dque.New(string(Medium), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
	if err != nil {
		d.now, err = dque.Open(string(Medium), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
		if err != nil {
			util.HandleError("Error creating med client", err)
			return nil
		}
	}
	d.high, err = dque.New(string(High), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
	if err != nil {
		d.high, err = dque.Open(string(High), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
		if err != nil {
			util.HandleError("Error creating high client", err)
			return nil
		}
	}
	d.now, err = dque.New(string(Now), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
	if err != nil {
		d.now, err = dque.Open(string(Now), viper.GetString(util.QueueDirectory), SegmentSize, HolderBuilder)
		if err != nil {
			util.HandleError("Error creating now client", err)
			return nil
		}
	}
	return d
}

func (d Dq) ListenOnQueue(handler func(item interface{}), stop *bool) {
	go func() {
		for {
			if !*stop {
				item := d.Pop()
				handler(item)
			}
		}
	}()
}

And then it is being initialized with

        dq := queue.Dq{}
	qh := QueueHandler{
		Queue: dq,
	}
	dq.Connect()
	var stop bool
	dq.ListenOnQueue(func(item interface{}) {
		fmt.Println(item)
	}, &stop)

@Kriechi
Copy link
Contributor

Kriechi commented Mar 19, 2020

Quick one without thinking too much:

you might want to try using "methods on pointers":

func (d Dq) Connect() interface{} {

vs.

func (d *Dq) Connect() interface{} {

Also, dque has a nice NewOrOpen which might save you a bit of error handling code.

@dllz
Copy link
Author

dllz commented Mar 19, 2020

Issue persists with pointers sadly,
Thanks for the NewOrOpen, that will definitely come in handy

@joncrlsn
Copy link
Owner

joncrlsn commented Mar 20, 2020

Hi dllz, Nothing jumps out at me right away either. Could you maybe change your comment above to be the contents of a stand-alone go file that can be run to show the problem? Then I can copy it into test.go and run go run test.go. For example,

  • I can't find the definition of QueueHandler.
  • I can't tell how stop ever gets flipped.
  • Please hardcode values like this to make it compile without extra dependencies: viper.GetString(util.QueueDirectory)
  • Make sure your example uses methods on the pointer of Dq like Kriechi suggested.

Whenever I have had to show a problem with the simplest possible problem demonstration program on any code I've written, I find it really helps my understanding of things, as well as helping others help me find the problem.

Thanks!

  • Jon

@dllz
Copy link
Author

dllz commented Mar 20, 2020

Hi Jon.

Sorry about that, I have cleaned up the code to hopefully solve your problem.

package queue

import (
	"errors"
	"fmt"
	"github.com/joncrlsn/dque"
	"time"
)

type Priority string

const (
	Low         Priority = "low"
	Medium      Priority = "med"
	High        Priority = "high"
	Now         Priority = "now"
	SegmentSize          = 50
)


type Dq struct {
	low  *dque.DQue
	med  *dque.DQue
	high *dque.DQue
	now  *dque.DQue
}

type Holder struct {
	Item interface{}
	Time int64
	Id   string
}

func HolderBuilder() interface{} {
	return &Holder{}
}

func (d *Dq) Push(id string, item interface{}, priority Priority) error {
	holder := Holder{
		Item: item,
		Time: time.Now().Unix(),
	}
	switch priority {
	case Now:
		err := d.now.Enqueue(holder)
		if err != nil {
			fmt.Println(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	case High:
		err := d.high.Enqueue(holder)
		if err != nil {
			fmt.Println(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	case Medium:
		err := d.med.Enqueue(holder)
		if err != nil {
			fmt.Println(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	case Low:
		err := d.low.Enqueue(holder)
		if err != nil {
			fmt.Println(fmt.Sprintf("Unable to enqueue item: [%v] to queue %v", item, priority), err)
			return err
		}
		break
	default:
		fmt.Println("No matching priority")
		return errors.New("No matching priority for "+string(priority))
	}
	return nil
}

func (d *Dq) Pop() interface{} {
	for {
		temp, err := d.now.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			fmt.Println("Error fetching now priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		temp, err = d.high.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			fmt.Println("Error fetching high priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		temp, err = d.med.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			fmt.Println("Error fetching med priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		temp, err = d.low.Dequeue()
		if err != nil && err != dque.ErrEmpty {
			fmt.Println("Error fetching low priority", err)
		}
		if temp != nil && err == nil {
			return temp
		}
		time.Sleep(time.Millisecond * 500)
	}
}

func (d *Dq) Connect() interface{} {
	var err error
	d.low, err = dque.New(string(Low), "queue", SegmentSize, HolderBuilder)
	if err != nil {
		d.now, err = dque.Open(string(Low), "queue", SegmentSize, HolderBuilder)
		if err != nil {
			fmt.Println("Error creating low client", err)
			return nil
		}
	}
	d.med, err = dque.New(string(Medium), "queue", SegmentSize, HolderBuilder)
	if err != nil {
		d.now, err = dque.Open(string(Medium), "queue", SegmentSize, HolderBuilder)
		if err != nil {
			fmt.Println("Error creating med client", err)
			return nil
		}
	}
	d.high, err = dque.New(string(High), "queue", SegmentSize, HolderBuilder)
	if err != nil {
		d.high, err = dque.Open(string(High), "queue", SegmentSize, HolderBuilder)
		if err != nil {
			fmt.Println("Error creating high client", err)
			return nil
		}
	}
	d.now, err = dque.New(string(Now), "queue", SegmentSize, HolderBuilder)
	if err != nil {
		d.now, err = dque.Open(string(Now), "queue", SegmentSize, HolderBuilder)
		if err != nil {
			fmt.Println("Error creating now client", err)
			return nil
		}
	}
	return d
}

func (d *Dq) ListenOnQueue(handler func(item interface{}), stop *bool) {
	go func() {
		for {
			if !*stop {
				item := d.Pop()
				handler(item)
			}
		}
	}()
}

The calling code:

package webservice

import (
	"canvas-data-management-go/pkg/export"
	"canvas-data-management-go/pkg/queue"
	"canvas-data-management-go/pkg/util"
	"encoding/json"
	"fmt"
	"io"
	"io/ioutil"
	"net/http"
)

type QueueJson struct {
	Item     string         `json:item`
	Id       string         `json:id`
	Priority queue.Priority `json:Priority`
}

type QueueHandler struct {
	Queue queue.Dq
}
func Run(ip string, port string) error {
	dq := queue.Dq{}
	qh := QueueHandler{
		Queue: dq,
	}
	dq.Connect()

	http.HandleFunc("/queue", qh.handleQueue)
	hostAddress := "localhost" + ":" + "12345"
	var stop bool
	dq.ListenOnQueue(func(item interface{}) {
		fmt.Println(item)
	}, &stop)
		err := http.ListenAndServe(hostAddress, nil)
	if err != nil{
		fmt.Println("Error hosting http listener "+hostAddress)
		return err
	}
	return nil
}
func (qh *QueueHandler) handleQueue(w http.ResponseWriter, r *http.Request) {
	fmt.Println("Received queue request")
	var item QueueJson
	err := json.Unmarshal(getBody(r.Body, w), &item)
	if err != nil {
		fmt.Println("Error un-marshalling json from http server", err)
		http.Error(w, "Bad Request", http.StatusBadRequest)
		return
	}
	err = qh.Queue.Push(item.Id, item.Item, item.Priority)
	if err != nil {
		fmt.Println("Error queuing data: "+item.Id, err)
		http.Error(w, "Error queuing data for ID: "+item.Id+" "+err.Error(), http.StatusInternalServerError)
		return
	}
	response := ResponseJson{Message: "Successfully processed"}
	encoder := json.NewEncoder(w)
	err = encoder.Encode(response)
}

Webservice caller

package cmd

import (
	ws "canvas-data-management-go/pkg/webservice"
	"fmt"
	"github.com/spf13/cobra"
)

var (
	webserviceCommand = &cobra.Command{
		Use:   "web",
		Short: "Put the cli a mode that it receives commands from the internet",
		Run: func(cmd *cobra.Command, args []string) {
			err := webservice(ip, port)
			if err != nil {
				fmt.Print("Error running command", err)
			}
		},
	}
	port string
	ip   string
)

func webservice(ip string, port string) error {
	err := ws.Run()
	if err != nil {
		return err
	}
	return nil
}

root.go

package cmd

import (
	"github.com/spf13/cobra"
)

var (
	rootCmd = &cobra.Command{
		Use:              "cdm",
		TraverseChildren: true,
	}
)

func Execute() error {
	return rootCmd.Execute()
}

func init() {

	//Web
	webserviceCommand.Flags().StringVar(&ip, "ip", "127.0.0.1", "Specify the ip address to listen on")
	webserviceCommand.Flags().StringVar(&port, "port", "2364", "Specify the ip address to listen on")
	rootCmd.AddCommand(webserviceCommand)
}

main.go

package main

import (
	"canvas-data-management-go/cmd"
	"fmt"
)

func main() {
	err := cmd.Execute()
	if err != nil {
		fmt.Println("Error starting up", err)
	}
}

@dllz dllz closed this as completed Mar 20, 2020
@dllz dllz reopened this Mar 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants