Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Mathieu <[email protected]>
  • Loading branch information
mathieu-brl committed Dec 5, 2024
1 parent b921bb3 commit 16f0030
Show file tree
Hide file tree
Showing 21 changed files with 673 additions and 36 deletions.
4 changes: 4 additions & 0 deletions microsoft_consumer/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
LOG_LEVEL=debug
RMQ_URL=amqp://guest:guest@localhost:5000/
GOOGLE_CREDENTIAL=your-google-credential
CLIENT_ID=your-client-id
6 changes: 6 additions & 0 deletions microsoft_consumer/consts/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package consts

const EnvFile = ".env"
const EnvFileDirectory = "."

const MessageQueue = "message_queue"
58 changes: 58 additions & 0 deletions microsoft_consumer/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
module consumer

go 1.23.3

require (
github.com/lakhinsu/rabbitmq-go-example/consumer v0.0.0-20220116173101-cd008c3ff7d7
github.com/rs/zerolog v1.33.0
github.com/spf13/viper v1.19.0
)

require (
cloud.google.com/go/auth v0.11.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect
cloud.google.com/go/compute/metadata v0.5.2 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/googleapis/gax-go/v2 v2.14.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/streadway/amqp v1.0.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.57.0 // indirect
go.opentelemetry.io/otel v1.32.0 // indirect
go.opentelemetry.io/otel/metric v1.32.0 // indirect
go.opentelemetry.io/otel/trace v1.32.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/crypto v0.29.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/oauth2 v0.24.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.20.0 // indirect
google.golang.org/api v0.209.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241118233622-e639e219e697 // indirect
google.golang.org/grpc v1.68.0 // indirect
google.golang.org/protobuf v1.35.2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
240 changes: 240 additions & 0 deletions microsoft_consumer/go.sum

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions microsoft_consumer/handlers/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package handlers

import (
"consumer/service"
"consumer/utils"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/rs/zerolog/log"
"strings"
)

func HandleMessage(queue string, msg amqp.Delivery, err error) {
if err != nil {
log.Err(err).Msg("Error occurred in RMQ consumer")
}
log.Info().Msgf("Message received on '%s' queue: %s", queue, string(msg.Body))

if strings.HasPrefix(string(msg.Body), "send email:") {
parts := strings.SplitN(string(msg.Body), ":", 2)
if len(parts) == 2 {
email := strings.TrimSpace(parts[1])
err := service.SendEmailTo(utils.Token, email)
if err != nil {
log.Err(err).Msg("Error sending email")
} else {
log.Info().Msgf("Email sent to %s", email)
}
} else {
log.Error().Msg("Invalid message format")
}
}
}
33 changes: 33 additions & 0 deletions microsoft_consumer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"consumer/consts"
"consumer/handlers"
"consumer/utils"
"github.com/rs/zerolog/log"
"net/http"
)

func main() {
http.HandleFunc("/auth/google/login", utils.GetAuthURL)
http.HandleFunc("/auth/google/callback", utils.HandleGoogleCallback)
log.Info().Msg("Server started successfully on port http://localhost:8042")
log.Info().Msg("Login with Google at http://localhost:8042/auth/google/login")
log.Info().Msg("Callback URL: http://localhost:8042/auth/google/callback")
connectionString := utils.GetEnvVar("RMQ_URL")

messageQueue := utils.RMQConsumer{
Queue: consts.MessageQueue,
ConnectionString: connectionString,
MsgHandler: handlers.HandleMessage,
}
forever := make(chan bool)

go messageQueue.Consume()
err := http.ListenAndServe(":8042", nil)
if err != nil {
log.Fatal().Err(err).Msg("Server failed to start")
}
<-forever

}
29 changes: 29 additions & 0 deletions microsoft_consumer/service/email.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package service

import (
"consumer/utils"
"encoding/base64"
"fmt"
"golang.org/x/oauth2"
"google.golang.org/api/gmail/v1"
)

func SendEmailTo(token *oauth2.Token, toEmail string) error {
service, err := utils.InitGoogleAPI(token, "gmail")
if err != nil {
return err
}

gmailService := service.(*gmail.Service)
message := []byte(fmt.Sprintf("To: %s\r\nSubject: Test Email\r\n\r\nThis is a test email sent from AREA!", toEmail))
msg := gmail.Message{
Raw: base64.URLEncoding.EncodeToString(message),
}

_, err = gmailService.Users.Messages.Send("me", &msg).Do()
if err != nil {
return err
}

return nil
}
59 changes: 59 additions & 0 deletions microsoft_consumer/utils/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package utils

import (
amqp "github.com/rabbitmq/amqp091-go"
"github.com/rs/zerolog/log"
)

type RMQConsumer struct {
Queue string
ConnectionString string
MsgHandler func(queue string, msg amqp.Delivery, err error)
}

func (x RMQConsumer) OnError(err error, msg string) {
if err != nil {
x.MsgHandler(x.Queue, amqp.Delivery{}, err)
}
}

func (x RMQConsumer) Consume() {
conn, err := amqp.Dial(x.ConnectionString)
x.OnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
x.OnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
x.Queue, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
x.OnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
x.OnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
for d := range msgs {
x.MsgHandler(x.Queue, d, nil)
}
}()
log.Info().Msgf("Started listening for messages on '%s' queue", x.Queue)
<-forever
}
107 changes: 107 additions & 0 deletions microsoft_consumer/utils/googleAuth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package utils

import (
"context"
"encoding/json"
"fmt"
"github.com/lakhinsu/rabbitmq-go-example/consumer/utils"
"github.com/rs/zerolog/log"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/gmail/v1"
"google.golang.org/api/option"
"google.golang.org/api/people/v1"
"net/http"
"time"
)

var (
Token *oauth2.Token
refreshToken string
)

var config = &oauth2.Config{
RedirectURL: "http://localhost:8042/auth/google/callback",
ClientID: utils.GetEnvVar("CLIENT_ID"),
ClientSecret: utils.GetEnvVar("GOOGLE_CREDENTIAL"),
Scopes: []string{"https://www.googleapis.com/auth/userinfo.profile", "https://www.googleapis.com/auth/userinfo.email", "https://www.googleapis.com/auth/gmail.send"},
Endpoint: google.Endpoint,
}

func GetAuthURL(w http.ResponseWriter, r *http.Request) {
url := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
http.Redirect(w, r, url, http.StatusTemporaryRedirect)
}

func RefreshToken() error {
ctx := context.Background()
token := &oauth2.Token{RefreshToken: refreshToken}
newToken, err := config.TokenSource(ctx, token).Token()
if err != nil {
return err
}
Token = newToken
return nil
}

func ExchangeCodeForToken(code string) (*oauth2.Token, error) {
ctx := context.Background()
token, err := config.Exchange(ctx, code)
if err != nil {
return nil, err
}
Token = token
refreshToken = token.RefreshToken
return token, nil
}

func InitGoogleAPI(token *oauth2.Token, apiName string) (interface{}, error) {
if Token.Expiry.Before(time.Now()) {
if err := RefreshToken(); err != nil {
return nil, err
}
}
ctx := context.Background()
httpClient := config.Client(ctx, token)

switch apiName {
case "gmail":
service, err := gmail.NewService(ctx, option.WithHTTPClient(httpClient))
if err != nil {
return nil, err
}
return service, nil
case "people":
service, err := people.NewService(ctx, option.WithHTTPClient(httpClient))
if err != nil {
return nil, err
}
return service, nil
default:
return nil, fmt.Errorf("unsupported API: %s", apiName)
}
}

func HandleGoogleCallback(w http.ResponseWriter, r *http.Request) {
code := r.FormValue("code")
_, err := ExchangeCodeForToken(code)
if err != nil {
http.Error(w, "Failed to exchange token", http.StatusInternalServerError)
return
}

response, err := http.Get("https://www.googleapis.com/oauth2/v2/userinfo?access_token=" + Token.AccessToken)
if err != nil {
http.Error(w, "Failed to get user info", http.StatusInternalServerError)
return
}
defer response.Body.Close()

var userInfo map[string]interface{}
err = json.NewDecoder(response.Body).Decode(&userInfo)
if err != nil {
http.Error(w, "Failed to decode user info", http.StatusInternalServerError)
return
}
log.Log().Msgf("User info: %+v", userInfo)
}
27 changes: 27 additions & 0 deletions microsoft_consumer/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package utils

import (
"consumer/consts"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
)

func init() {
viper.SetConfigFile(consts.EnvFile)
viper.AddConfigPath(consts.EnvFileDirectory)
err := viper.ReadInConfig()
if err != nil {
log.Debug().Err(err).
Msg("Error occurred while reading env file, might fallback to OS env config")
}
viper.AutomaticEnv()
}

func GetEnvVar(name string) string {
if !viper.IsSet(name) {
log.Debug().Msgf("Environment variable %s is not set", name)
return ""
}
value := viper.GetString(name)
return value
}
11 changes: 0 additions & 11 deletions server/cmd/consumer/main.go

This file was deleted.

Loading

0 comments on commit 16f0030

Please sign in to comment.