Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: queue pr part 1 #808

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3817c33
refactor: queue pr 1
devhaozi Jan 5, 2025
9ef22f0
chore: update mocks
devhaozi Jan 5, 2025
3fe9d36
feat: add job test
devhaozi Jan 5, 2025
043a207
feat: fix some test
devhaozi Jan 5, 2025
7c40510
chore: update mocks
devhaozi Jan 5, 2025
6aa01be
feat: fix some test
devhaozi Jan 5, 2025
a97b123
feat: fix some test
devhaozi Jan 5, 2025
63f2edf
feat: fix some test
devhaozi Jan 5, 2025
ace5f8a
feat: fix some test
devhaozi Jan 5, 2025
70578c6
feat: fix some test
devhaozi Jan 5, 2025
f4e70fc
feat: fix some test
devhaozi Jan 5, 2025
8ad5afb
chore: update mocks
devhaozi Jan 5, 2025
ee9dbee
feat: fix some test
devhaozi Jan 5, 2025
de53511
Merge remote-tracking branch 'origin/haozi/queue1' into haozi/queue1
devhaozi Jan 5, 2025
5ed60a4
feat: fix some test
devhaozi Jan 5, 2025
22efc89
feat: fix some test
devhaozi Jan 5, 2025
c41668d
feat: fix some test
devhaozi Jan 5, 2025
eeba900
feat: update
devhaozi Jan 6, 2025
f097ecc
chore: update mocks
devhaozi Jan 6, 2025
1c2cb05
feat: update tests
devhaozi Jan 6, 2025
a5fdf4c
fix: lint
devhaozi Jan 6, 2025
83e2a5d
feat: optimize worker
devhaozi Jan 6, 2025
9b86fa3
feat: optimize
devhaozi Jan 8, 2025
8f2b6ee
feat: optimize tests
devhaozi Jan 8, 2025
8934d91
feat: optimize tests
devhaozi Jan 8, 2025
95d0c4c
chore: update mocks
devhaozi Jan 8, 2025
28a9e1d
Merge branch 'master' into haozi/queue1
devhaozi Jan 8, 2025
08b552a
feat: optimize tests
devhaozi Jan 8, 2025
1761b02
feat: optimize tests
devhaozi Jan 8, 2025
9c3155b
Merge branch 'master' into haozi/queue1
devhaozi Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions contracts/queue/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package queue

import "github.com/goravel/framework/contracts/database/orm"

type Config interface {
Debug() bool
DefaultConnection() string
Driver(connection string) string
FailedJobsQuery() orm.Query
Queue(connection, queue string) string
Redis(queueConnection string) (dsn string, database int, queue string) // TODO: Will be removed in v1.17
Size(connection string) int
Via(connection string) any
}
23 changes: 23 additions & 0 deletions contracts/queue/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package queue

import "time"

const DriverSync string = "sync"
const DriverAsync string = "async"
const DriverMachinery string = "machinery" // TODO: Will be removed in v1.17
const DriverCustom string = "custom"

type Driver interface {
// Bulk pushes a slice of jobs onto the queue.
Bulk(jobs []Jobs, queue string) error
// Connection returns the connection name for the driver.
Connection() string
// Driver returns the driver name for the driver.
Driver() string
// Later pushes the job onto the queue after a delay.
Later(delay time.Time, job Job, args []any, queue string) error
// Pop pops the next job off of the queue.
Pop(queue string) (Job, []any, error)
// Push pushes the job onto the queue.
Push(job Job, args []any, queue string) error
}
14 changes: 12 additions & 2 deletions contracts/queue/job.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package queue

import "time"

type Job interface {
// Signature set the unique signature of the job.
Signature() string
// Handle executes the job.
Handle(args ...any) error
}

type JobRepository interface {
All() []Job
Call(signature string, args []any) error
Get(signature string) (Job, error)
Register(jobs []Job)
}

type Jobs struct {
Job Job
Args []Arg
Job Job
Args []any
Delay time.Duration
}
23 changes: 11 additions & 12 deletions contracts/queue/queue.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package queue

type Queue interface {
Worker(args ...Args) Worker
// Register register jobs
Register(jobs []Job)
// GetJobs get all jobs
GetJobs() []Job
// Job add a job to queue
Job(job Job, args []Arg) Task
// All get all jobs
All() []Job
devhaozi marked this conversation as resolved.
Show resolved Hide resolved
// Chain creates a chain of jobs to be processed one by one, passing
Chain(jobs []Jobs) Task
// GetJob get job by signature
GetJob(signature string) (Job, error)
// Job add a job to queue
Job(job Job, args []any) Task
// Register register jobs
Register(jobs []Job)
// Worker create a queue worker
Worker(payloads ...Args) Worker
}

type Worker interface {
Run() error
Shutdown() error
}

type Args struct {
Expand All @@ -24,8 +28,3 @@ type Args struct {
// Concurrent num
Concurrent int
}

type Arg struct {
Type string
Value any
}
5 changes: 5 additions & 0 deletions errors/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,15 @@ var (
OrmRecordNotFound = New("record not found")
OrmDeletedAtColumnNotFound = New("deleted at column not found")

QueueDriverAsyncNoJobFound = New("no job found in %s queue")
QueueDriverSyncNotNeedRun = New("queue %s driver sync not need run")
QueueDriverNotSupported = New("unknown queue driver: %s")
QueueDriverInvalid = New("%s doesn't implement contracts/queue/driver")
QueueDuplicateJobSignature = New("job signature duplicate: %s, the names of Job and Listener cannot be duplicated")
QueueEmptyJobSignature = New("the Signature of job can't be empty")
QueueEmptyListenerSignature = New("the Signature of listener can't be empty")
QueueJobNotFound = New("job not found: %s")
QueueFailedToSaveFailedJob = New("failed to save failed job: %v")

RouteDefaultDriverNotSet = New("please set default driver")
RouteInvalidDriver = New("init %s route driver fail: route must be implement route.Route or func() (route.Route, error)")
Expand Down
9 changes: 3 additions & 6 deletions event/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,10 @@ func (receiver *Task) Dispatch() error {
return nil
}

func eventArgsToQueueArgs(args []event.Arg) []queuecontract.Arg {
var queueArgs []queuecontract.Arg
func eventArgsToQueueArgs(args []event.Arg) []any {
var queueArgs []any
for _, arg := range args {
queueArgs = append(queueArgs, queuecontract.Arg{
Type: arg.Type,
Value: arg.Value,
})
queueArgs = append(queueArgs, arg.Value)
}

return queueArgs
Expand Down
9 changes: 2 additions & 7 deletions event/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/stretchr/testify/assert"

"github.com/goravel/framework/contracts/event"
queuecontract "github.com/goravel/framework/contracts/queue"
"github.com/goravel/framework/errors"
queuemock "github.com/goravel/framework/mocks/queue"
)
Expand All @@ -32,9 +31,7 @@ func TestDispatch(t *testing.T) {
listener := &TestListener{}
mockTask := &queuemock.Task{}

mockQueue.On("Job", listener, []queuecontract.Arg{
{Type: "string", Value: "test"},
}).Return(mockTask).Once()
mockQueue.On("Job", listener, []any{"test"}).Return(mockTask).Once()
mockTask.On("DispatchSync").Return(nil).Once()

task = NewTask(mockQueue, []event.Arg{
Expand All @@ -51,9 +48,7 @@ func TestDispatch(t *testing.T) {
listener := &TestListenerHandleError{}
mockTask := &queuemock.Task{}

mockQueue.On("Job", listener, []queuecontract.Arg{
{Type: "string", Value: "test"},
}).Return(mockTask).Once()
mockQueue.On("Job", listener, []any{"test"}).Return(mockTask).Once()
mockTask.On("DispatchSync").Return(errors.New("error")).Once()

task = NewTask(mockQueue, []event.Arg{
Expand Down
18 changes: 9 additions & 9 deletions mail/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ func (r *Application) Queue(mailable ...mail.Mailable) error {
r.setUsingMailable(mailable[0])
}

job := r.queue.Job(NewSendMailJob(r.config), []queuecontract.Arg{
{Value: r.subject, Type: "string"},
{Value: r.html, Type: "string"},
{Value: r.from.Address, Type: "string"},
{Value: r.from.Name, Type: "string"},
{Value: r.to, Type: "[]string"},
{Value: r.cc, Type: "[]string"},
{Value: r.bcc, Type: "[]string"},
{Value: r.attachments, Type: "[]string"},
job := r.queue.Job(NewSendMailJob(r.config), []any{
r.subject,
r.html,
r.from.Address,
r.from.Name,
r.to,
r.cc,
r.bcc,
r.attachments,
})

if len(mailable) > 0 {
Expand Down
58 changes: 16 additions & 42 deletions mail/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,51 +7,33 @@ import (
"time"

"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"github.com/goravel/framework/contracts/mail"
queuecontract "github.com/goravel/framework/contracts/queue"
configmock "github.com/goravel/framework/mocks/config"
logmock "github.com/goravel/framework/mocks/log"
"github.com/goravel/framework/queue"
"github.com/goravel/framework/support/color"
testingdocker "github.com/goravel/framework/support/docker"
"github.com/goravel/framework/support/env"
"github.com/goravel/framework/support/file"
)

var testBcc, testCc, testTo, testFromAddress, testFromName string

type ApplicationTestSuite struct {
suite.Suite
redisPort int
}

func TestApplicationTestSuite(t *testing.T) {
if env.IsWindows() {
t.Skip("Skip test that using Docker")
}

if !file.Exists("../.env") && os.Getenv("MAIL_HOST") == "" {
color.Errorln("No mail tests run, need create .env based on .env.example, then initialize it")
return
}

redisDocker := testingdocker.NewRedis()
assert.Nil(t, redisDocker.Build())

suite.Run(t, &ApplicationTestSuite{
redisPort: redisDocker.Config().Port,
})

assert.Nil(t, redisDocker.Shutdown())
}

func (s *ApplicationTestSuite) SetupTest() {}

func (s *ApplicationTestSuite) TestSendMailBy465Port() {
mockConfig := mockConfig(465, s.redisPort)
mockConfig := mockConfig(465)
app := NewApplication(mockConfig, nil)
s.Nil(app.To([]string{testTo}).
Cc([]string{testCc}).
Expand All @@ -63,7 +45,7 @@ func (s *ApplicationTestSuite) TestSendMailBy465Port() {
}

func (s *ApplicationTestSuite) TestSendMailBy587Port() {
mockConfig := mockConfig(587, s.redisPort)
mockConfig := mockConfig(587)
app := NewApplication(mockConfig, nil)
s.Nil(app.To([]string{testTo}).
Cc([]string{testCc}).
Expand All @@ -75,7 +57,7 @@ func (s *ApplicationTestSuite) TestSendMailBy587Port() {
}

func (s *ApplicationTestSuite) TestSendMailWithFrom() {
mockConfig := mockConfig(587, s.redisPort)
mockConfig := mockConfig(587)
app := NewApplication(mockConfig, nil)
s.Nil(app.From(Address(testFromAddress, testFromName)).
To([]string{testTo}).
Expand All @@ -88,16 +70,15 @@ func (s *ApplicationTestSuite) TestSendMailWithFrom() {
}

func (s *ApplicationTestSuite) TestSendMailWithMailable() {
mockConfig := mockConfig(587, s.redisPort)
mockConfig := mockConfig(587)
app := NewApplication(mockConfig, nil)
s.Nil(app.Send(NewTestMailable()))
}

func (s *ApplicationTestSuite) TestQueueMail() {
mockConfig := mockConfig(587, s.redisPort)
mockLog := &logmock.Log{}
mockConfig := mockConfig(587)

queueFacade := queue.NewApplication(mockConfig, mockLog)
queueFacade := queue.NewApplication(mockConfig)
queueFacade.Register([]queuecontract.Job{
NewSendMailJob(mockConfig),
})
Expand Down Expand Up @@ -125,10 +106,9 @@ func (s *ApplicationTestSuite) TestQueueMail() {
}

func (s *ApplicationTestSuite) TestQueueMailWithConnection() {
mockConfig := mockConfig(587, s.redisPort)
mockLog := &logmock.Log{}
mockConfig := mockConfig(587)

queueFacade := queue.NewApplication(mockConfig, mockLog)
queueFacade := queue.NewApplication(mockConfig)
queueFacade.Register([]queuecontract.Job{
NewSendMailJob(mockConfig),
})
Expand Down Expand Up @@ -159,10 +139,9 @@ func (s *ApplicationTestSuite) TestQueueMailWithConnection() {
}

func (s *ApplicationTestSuite) TestQueueMailWithMailable() {
mockConfig := mockConfig(587, s.redisPort)
mockLog := &logmock.Log{}
mockConfig := mockConfig(587)

queueFacade := queue.NewApplication(mockConfig, mockLog)
queueFacade := queue.NewApplication(mockConfig)
queueFacade.Register([]queuecontract.Job{
NewSendMailJob(mockConfig),
})
Expand All @@ -183,19 +162,14 @@ func (s *ApplicationTestSuite) TestQueueMailWithMailable() {
time.Sleep(3 * time.Second)
}

func mockConfig(mailPort, redisPort int) *configmock.Config {
func mockConfig(mailPort int) *configmock.Config {
mockConfig := &configmock.Config{}
mockConfig.On("GetString", "app.name").Return("goravel")
mockConfig.On("GetBool", "app.debug").Return(false)
mockConfig.On("GetString", "queue.default").Return("redis")
mockConfig.On("GetString", "queue.connections.sync.driver").Return("sync")
mockConfig.On("GetString", "queue.connections.redis.driver").Return("redis")
mockConfig.On("GetString", "queue.connections.redis.connection").Return("default")
mockConfig.On("GetString", "queue.connections.redis.queue", "default").Return("default")
mockConfig.On("GetString", "database.redis.default.host").Return("localhost")
mockConfig.On("GetString", "database.redis.default.password").Return("")
mockConfig.On("GetInt", "database.redis.default.port").Return(redisPort)
mockConfig.On("GetInt", "database.redis.default.database").Return(0)
mockConfig.On("GetString", "queue.default").Return("async")
mockConfig.On("GetString", "queue.connections.async.queue", "default").Return("default")
mockConfig.On("GetString", "queue.connections.async.driver").Return("async")
mockConfig.On("GetString", "queue.failed.database").Return("database")
mockConfig.On("GetString", "queue.failed.table").Return("failed_jobs")

if file.Exists("../.env") {
vip := viper.New()
Expand Down
Loading
Loading