Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added read, pause, resume and delete for queue #15

Merged
merged 12 commits into from
Feb 1, 2022
Merged
126 changes: 84 additions & 42 deletions internal/datastore/mongo_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,89 +2,131 @@ package datastore

import (
"context"
"fmt"
"log"
"os"
"os"

"github.com/SimplQ/simplQ-golang/internal/models"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

// A Structure with Collections frequently used and a pointer to the client
type MongoDB struct {
Client *mongo.Client
Queue *mongo.Collection
Token *mongo.Collection
Client *mongo.Client
Queue *mongo.Collection
Token *mongo.Collection
}

func NewMongoDB() *MongoDB {
// Use local development mongodb instance if env variable not set
uri := "mongodb://root:example@localhost:27017/?maxPoolSize=20&w=majority"
if val, ok := os.LookupEnv("MONGO_URI"); ok {
uri = val
// Use local development mongodb instance if env variable not set
uri := "mongodb://root:example@localhost:27017/?maxPoolSize=20&w=majority"

if val, ok := os.LookupEnv("MONGO_URI"); ok {
uri = val
}

log.Println("Connecting to MongoDB...")
log.Println("Connecting to MongoDB...")

client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))

client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))

if err != nil {
log.Fatal(err)
}
if err != nil {
log.Fatal(err)
}

new_mongodb := MongoDB {
client,
client.Database("simplq").Collection("queue"),
client.Database("simplq").Collection("token"),
}
new_mongodb := MongoDB{
client,
client.Database("simplq").Collection("queue"),
client.Database("simplq").Collection("token"),
}

log.Println("Successfully connected to MongoDB!")
log.Println("Successfully connected to MongoDB!")

return &new_mongodb
return &new_mongodb
}

func (mongodb MongoDB) CreateQueue(queue models.Queue) models.QueueId {
// Set id to empty so its generated by mongoDB
queue.Id = ""
// Set id to empty so its generated by mongoDB
queue.Id = ""

result, err := mongodb.Queue.InsertOne(context.TODO(), queue)
result, err := mongodb.Queue.InsertOne(context.TODO(), queue)

if err != nil {
log.Fatal(err)
}
stringId := result.InsertedID.(primitive.ObjectID).Hex()
if err != nil {
log.Fatal(err)
}

stringId := result.InsertedID.(primitive.ObjectID).Hex()

return models.QueueId(stringId)
return models.QueueId(stringId)
}

func (mongodb MongoDB) ReadQueue(models.QueueId) models.Queue {
panic("Not implemented")
func (mongodb MongoDB) ReadQueue(id models.QueueId) models.Queue {
result := models.Queue{}

// convert id string to ObjectId
queueId, _ := primitive.ObjectIDFromHex(string(id))
maaverik marked this conversation as resolved.
Show resolved Hide resolved

err := mongodb.Queue.FindOne(context.TODO(), bson.M{"_id": queueId}).Decode(&result)

if err != nil {
log.Fatal(err)
maaverik marked this conversation as resolved.
Show resolved Hide resolved
}

return result
}

func (mongodb MongoDB) PauseQueue(models.QueueId) {
panic("Not implemented")
func (mongodb MongoDB) PauseQueue(id models.QueueId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code for this function and resume function is identical.

Let's have one setIsPaused(id models.QueueId, isPaused boolean) private function and then call it with true/false in PauseQueue/ResumeQueue

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added setIsPaused()

queueId, _ := primitive.ObjectIDFromHex(string(id))
result, err := mongodb.Queue.UpdateOne(
context.TODO(),
bson.M{"_id": queueId},
bson.D{
{"$set", bson.D{{"isPaused", true}}},
},
)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Paused %v Documents!\n", result.ModifiedCount)
maaverik marked this conversation as resolved.
Show resolved Hide resolved
}

func (mongodb MongoDB) ResumeQueue(models.QueueId) {
panic("Not implemented")
func (mongodb MongoDB) ResumeQueue(id models.QueueId) {
queueId, _ := primitive.ObjectIDFromHex(string(id))
result, err := mongodb.Queue.UpdateOne(
context.TODO(),
bson.M{"_id": queueId},
bson.D{
{"$set", bson.D{{"isPaused", false}}},
},
)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Resumed %v Documents!\n", result.ModifiedCount)
}

func (mongodb MongoDB) DeleteQueue(models.QueueId) {
panic("Not implemented")
func (mongodb MongoDB) DeleteQueue(id models.QueueId) {
queueId, _ := primitive.ObjectIDFromHex(string(id))
result, err := mongodb.Queue.DeleteOne(
context.TODO(),
bson.M{"_id": queueId})
if err != nil {
log.Fatal(err)
}
fmt.Printf("Deleted %v Documents!\n", result.DeletedCount)
}

func (mongodb MongoDB) AddTokenToQueue(models.QueueId, models.Token) {
panic("Not implemented")
panic("Not implemented")
}

func (mongodb MongoDB) ReadToken(models.TokenId) {
panic("Not implemented")
panic("Not implemented")
}

func (mongodb MongoDB) RemoveToken(models.TokenId) {
panic("Not implemented")
panic("Not implemented")
}
85 changes: 67 additions & 18 deletions internal/handler/queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handler

import (
"context"
"encoding/json"
"fmt"
"log"
Expand All @@ -9,35 +10,83 @@ import (

"github.com/SimplQ/simplQ-golang/internal/datastore"
"github.com/SimplQ/simplQ-golang/internal/models"
"github.com/go-chi/chi/v5"
)

func GetQueue(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "GET Queue not implemented")
id := r.Context().Value("id").(string)
if id == "" {
w.WriteHeader(http.StatusNotFound)
return
}
q := datastore.Store.ReadQueue(models.QueueId(id))
json.NewEncoder(w).Encode(q)
fmt.Fprintf(w, "Get queue")
Copy link
Contributor

@daltonfury42 daltonfury42 Jan 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does line 24 do? Why do we need both lines 23 and 24?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Line 24 is not needed. I will remove it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

func CreateQueue(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
decoder := json.NewDecoder(r.Body)

var q models.Queue
err := decoder.Decode(&q)
var q models.Queue
err := decoder.Decode(&q)

if err != nil {
panic(err)
}
if err != nil {
panic(err)
}

// Initialize values
// Only consider queue name from the body of the request
q.CreationTime = time.Now()
q.IsDeleted = false
q.IsPaused = false
q.Tokens = make([]models.Token, 0)
// Initialize values
// Only consider queue name from the body of the request
q.CreationTime = time.Now()
q.IsDeleted = false
q.IsPaused = false
q.Tokens = make([]models.Token, 0)

log.Print("Create Queue: ")
log.Println(q)
log.Print("Create Queue: ")
log.Println(q)

insertedId := datastore.Store.CreateQueue(q)
insertedId := datastore.Store.CreateQueue(q)

log.Printf("Inserted %s", insertedId)
log.Printf("Inserted %s", insertedId)

fmt.Fprintf(w, "Post queue")
fmt.Fprintf(w, "Post queue")
}

func PauseQueue(w http.ResponseWriter, r *http.Request) {
id := r.Context().Value("id").(string)
if id == "" {
w.WriteHeader(http.StatusNotFound)
return
}
log.Println("URL path param 'id' is: " + string(id))
datastore.Store.PauseQueue(models.QueueId(id))
fmt.Fprintf(w, "Queue paused")
}

func ResumeQueue(w http.ResponseWriter, r *http.Request) {
id := r.Context().Value("id").(string)
if id == "" {
w.WriteHeader(http.StatusNotFound)
return
}
log.Println("URL path param 'id' is: " + string(id))
datastore.Store.ResumeQueue(models.QueueId(id))
fmt.Fprintf(w, "Queue resumed")
}

func DeleteQueue(w http.ResponseWriter, r *http.Request) {
id := r.Context().Value("id").(string)
if id == "" {
w.WriteHeader(http.StatusNotFound)
return
}
log.Println("URL path param 'id' is: " + string(id))
datastore.Store.DeleteQueue(models.QueueId(id))
fmt.Fprintf(w, "Delete queue")
}

func QueueCtx(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), "id", chi.URLParam(r, "id"))
next.ServeHTTP(w, r.WithContext(ctx))
})
}
21 changes: 18 additions & 3 deletions internal/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,23 @@ func InitalizeRoutes() chi.Router {
// Routes for "queue" resource
r.Route("/queue", func(r chi.Router) {
// POST /articles
r.Post("/", handler.CreateQueue)
r.Post("/", handler.CreateQueue)

// Subrouters
r.Route("/{id}", func(r chi.Router) {
r.Use(handler.QueueCtx)
r.Get("/", handler.GetQueue) // GET /queue/123
r.Delete("/", handler.DeleteQueue) // DELETE /queue/123
})
r.Route("/pause/{id}", func(r chi.Router) {
maaverik marked this conversation as resolved.
Show resolved Hide resolved
r.Use(handler.QueueCtx)
r.Put("/", handler.PauseQueue) // PUT /queue/pause/123

})
r.Route("/resume/{id}", func(r chi.Router) {
r.Use(handler.QueueCtx)
r.Put("/", handler.ResumeQueue) // PUT /queue/resume/123
})
})

return r;
return r
}