Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: queue pr part 1 #808

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3817c33
refactor: queue pr 1
devhaozi Jan 5, 2025
9ef22f0
chore: update mocks
devhaozi Jan 5, 2025
3fe9d36
feat: add job test
devhaozi Jan 5, 2025
043a207
feat: fix some test
devhaozi Jan 5, 2025
7c40510
chore: update mocks
devhaozi Jan 5, 2025
6aa01be
feat: fix some test
devhaozi Jan 5, 2025
a97b123
feat: fix some test
devhaozi Jan 5, 2025
63f2edf
feat: fix some test
devhaozi Jan 5, 2025
ace5f8a
feat: fix some test
devhaozi Jan 5, 2025
70578c6
feat: fix some test
devhaozi Jan 5, 2025
f4e70fc
feat: fix some test
devhaozi Jan 5, 2025
8ad5afb
chore: update mocks
devhaozi Jan 5, 2025
ee9dbee
feat: fix some test
devhaozi Jan 5, 2025
de53511
Merge remote-tracking branch 'origin/haozi/queue1' into haozi/queue1
devhaozi Jan 5, 2025
5ed60a4
feat: fix some test
devhaozi Jan 5, 2025
22efc89
feat: fix some test
devhaozi Jan 5, 2025
c41668d
feat: fix some test
devhaozi Jan 5, 2025
eeba900
feat: update
devhaozi Jan 6, 2025
f097ecc
chore: update mocks
devhaozi Jan 6, 2025
1c2cb05
feat: update tests
devhaozi Jan 6, 2025
a5fdf4c
fix: lint
devhaozi Jan 6, 2025
83e2a5d
feat: optimize worker
devhaozi Jan 6, 2025
9b86fa3
feat: optimize
devhaozi Jan 8, 2025
8f2b6ee
feat: optimize tests
devhaozi Jan 8, 2025
8934d91
feat: optimize tests
devhaozi Jan 8, 2025
95d0c4c
chore: update mocks
devhaozi Jan 8, 2025
28a9e1d
Merge branch 'master' into haozi/queue1
devhaozi Jan 8, 2025
08b552a
feat: optimize tests
devhaozi Jan 8, 2025
1761b02
feat: optimize tests
devhaozi Jan 8, 2025
9c3155b
Merge branch 'master' into haozi/queue1
devhaozi Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions contracts/queue/queue.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package queue

type Queue interface {
// All get all jobs
All() []Job
// Chain creates a chain of jobs to be processed one by one, passing
Chain(jobs []Jobs) Task
// GetJob get job by signature
GetJob(signature string) (Job, error)
// GetJobs get all jobs
GetJobs() []Job
// Job add a job to queue
Job(job Job, args []any) Task
// Register register jobs
Expand Down
94 changes: 47 additions & 47 deletions mocks/queue/Queue.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions queue/application.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package queue

import (
configcontract "github.com/goravel/framework/contracts/config"
contractsconfig "github.com/goravel/framework/contracts/config"
"github.com/goravel/framework/contracts/queue"
)

Expand All @@ -10,24 +10,25 @@
job queue.JobRepository
}

func NewApplication(config configcontract.Config) *Application {
func NewApplication(config contractsconfig.Config) *Application {
return &Application{
config: NewConfig(config),
job: NewJobImpl(),
}
}

func (app *Application) All() []queue.Job {
return app.job.All()
}
func (app *Application) Chain(jobs []queue.Jobs) queue.Task {
return NewChainTask(app.config, jobs)
}

func (app *Application) GetJob(signature string) (queue.Job, error) {
return app.job.Get(signature)

Check warning on line 25 in queue/application.go

View check run for this annotation

Codecov / codecov/patch

queue/application.go#L24-L25

Added lines #L24 - L25 were not covered by tests
}

func (app *Application) GetJobs() []queue.Job {
return app.job.All()

Check warning on line 29 in queue/application.go

View check run for this annotation

Codecov / codecov/patch

queue/application.go#L28-L29

Added lines #L28 - L29 were not covered by tests
}

func (app *Application) Job(job queue.Job, args []any) queue.Task {
return NewTask(app.config, job, args)
}
Expand Down
6 changes: 3 additions & 3 deletions queue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
import (
"fmt"

configcontract "github.com/goravel/framework/contracts/config"
contractsconfig "github.com/goravel/framework/contracts/config"
"github.com/goravel/framework/contracts/database/orm"
"github.com/goravel/framework/contracts/queue"
)

type Config struct {
devhaozi marked this conversation as resolved.
Show resolved Hide resolved
config configcontract.Config
config contractsconfig.Config
}

func NewConfig(config configcontract.Config) queue.Config {
func NewConfig(config contractsconfig.Config) queue.Config {
return &Config{
config: config,
}
Expand All @@ -28,8 +28,8 @@

func (r *Config) Driver(connection string) string {
if connection == "" {
connection = r.DefaultConnection()
}

Check warning on line 32 in queue/config.go

View check run for this annotation

Codecov / codecov/patch

queue/config.go#L31-L32

Added lines #L31 - L32 were not covered by tests

return r.config.GetString(fmt.Sprintf("queue.connections.%s.driver", connection))
}
Expand Down Expand Up @@ -76,16 +76,16 @@

func (r *Config) Size(connection string) int {
if connection == "" {
connection = r.DefaultConnection()
}

Check warning on line 80 in queue/config.go

View check run for this annotation

Codecov / codecov/patch

queue/config.go#L79-L80

Added lines #L79 - L80 were not covered by tests

return r.config.GetInt(fmt.Sprintf("queue.connections.%s.size", connection), 100)
}

func (r *Config) Via(connection string) any {
if connection == "" {
connection = r.DefaultConnection()
}

Check warning on line 88 in queue/config.go

View check run for this annotation

Codecov / codecov/patch

queue/config.go#L85-L88

Added lines #L85 - L88 were not covered by tests

return r.config.Get(fmt.Sprintf("queue.connections.%s.via", connection))

Check warning on line 90 in queue/config.go

View check run for this annotation

Codecov / codecov/patch

queue/config.go#L90

Added line #L90 was not covered by tests
}
84 changes: 30 additions & 54 deletions queue/driver_async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/goravel/framework/contracts/queue"
mocksconfig "github.com/goravel/framework/mocks/config"
mocksorm "github.com/goravel/framework/mocks/database/orm"
mocksqueue "github.com/goravel/framework/mocks/queue"
)

Expand All @@ -29,31 +28,23 @@ type DriverAsyncTestSuite struct {
}

func TestDriverAsyncTestSuite(t *testing.T) {
mockConfig := mocksconfig.NewConfig(t)
mockQueue := mocksqueue.NewQueue(t)
app := NewApplication(mockConfig)

app.Register([]queue.Job{&TestAsyncJob{}, &TestDelayAsyncJob{}, &TestCustomAsyncJob{}, &TestErrorAsyncJob{}, &TestChainAsyncJob{}})
suite.Run(t, &DriverAsyncTestSuite{
app: app,
mockConfig: mockConfig,
mockQueue: mockQueue,
})
suite.Run(t, new(DriverAsyncTestSuite))
}

func (s *DriverAsyncTestSuite) SetupTest() {
testAsyncJob = 0
s.mockQueue = mocksqueue.NewQueue(s.T())
s.mockConfig = mocksconfig.NewConfig(s.T())
devhaozi marked this conversation as resolved.
Show resolved Hide resolved
s.app = NewApplication(s.mockConfig)
s.app.Register([]queue.Job{&TestAsyncJob{}, &TestDelayAsyncJob{}, &TestCustomAsyncJob{}, &TestErrorAsyncJob{}, &TestChainAsyncJob{}})
}

func (s *DriverAsyncTestSuite) TestDefaultAsyncQueue() {
s.mockConfig.On("GetString", "queue.default").Return("async").Times(4)
s.mockConfig.On("GetString", "app.name").Return("goravel").Times(2)
s.mockConfig.On("GetString", "queue.connections.async.queue", "default").Return("default").Twice()
s.mockConfig.On("GetString", "queue.connections.async.driver").Return("async").Twice()
s.mockConfig.On("GetInt", "queue.connections.async.size", 100).Return(10).Twice()
s.mockConfig.On("GetString", "queue.failed.database").Return("database").Once()
s.mockConfig.On("GetString", "queue.failed.table").Return("failed_jobs").Once()
s.mockConfig.EXPECT().GetString("queue.default").Return("async").Times(3)
s.mockConfig.EXPECT().GetString("app.name").Return("goravel").Times(2)
s.mockConfig.EXPECT().GetString("queue.connections.async.queue", "default").Return("default").Twice()
s.mockConfig.EXPECT().GetString("queue.connections.async.driver").Return("async").Twice()
s.mockConfig.EXPECT().GetInt("queue.connections.async.size", 100).Return(10).Twice()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -71,13 +62,11 @@ func (s *DriverAsyncTestSuite) TestDefaultAsyncQueue() {
}

func (s *DriverAsyncTestSuite) TestDelayAsyncQueue() {
s.mockConfig.On("GetString", "queue.default").Return("async").Times(4)
s.mockConfig.On("GetString", "app.name").Return("goravel").Times(3)
s.mockConfig.On("GetString", "queue.connections.async.queue", "default").Return("default").Once()
s.mockConfig.On("GetString", "queue.connections.async.driver").Return("async").Twice()
s.mockConfig.On("GetInt", "queue.connections.async.size", 100).Return(10).Twice()
s.mockConfig.On("GetString", "queue.failed.database").Return("database").Once()
s.mockConfig.On("GetString", "queue.failed.table").Return("failed_jobs").Once()
s.mockConfig.EXPECT().GetString("queue.default").Return("async").Times(3)
s.mockConfig.EXPECT().GetString("app.name").Return("goravel").Times(3)
s.mockConfig.EXPECT().GetString("queue.connections.async.queue", "default").Return("default").Once()
s.mockConfig.EXPECT().GetString("queue.connections.async.driver").Return("async").Twice()
s.mockConfig.EXPECT().GetInt("queue.connections.async.size", 100).Return(10).Twice()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expand All @@ -99,13 +88,11 @@ func (s *DriverAsyncTestSuite) TestDelayAsyncQueue() {
}

func (s *DriverAsyncTestSuite) TestCustomAsyncQueue() {
s.mockConfig.On("GetString", "queue.default").Return("custom").Times(4)
s.mockConfig.On("GetString", "app.name").Return("goravel").Times(3)
s.mockConfig.On("GetString", "queue.connections.custom.queue", "default").Return("default").Once()
s.mockConfig.On("GetString", "queue.connections.custom.driver").Return("async").Times(2)
s.mockConfig.On("GetInt", "queue.connections.custom.size", 100).Return(10).Twice()
s.mockConfig.On("GetString", "queue.failed.database").Return("database").Once()
s.mockConfig.On("GetString", "queue.failed.table").Return("failed_jobs").Once()
s.mockConfig.EXPECT().GetString("queue.default").Return("custom").Times(3)
s.mockConfig.EXPECT().GetString("app.name").Return("goravel").Times(3)
s.mockConfig.EXPECT().GetString("queue.connections.custom.queue", "default").Return("default").Once()
s.mockConfig.EXPECT().GetString("queue.connections.custom.driver").Return("async").Times(2)
s.mockConfig.EXPECT().GetInt("queue.connections.custom.size", 100).Return(10).Twice()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -127,21 +114,12 @@ func (s *DriverAsyncTestSuite) TestCustomAsyncQueue() {
}

func (s *DriverAsyncTestSuite) TestErrorAsyncQueue() {
s.mockConfig.On("GetString", "queue.default").Return("async").Times(4)
s.mockConfig.On("GetString", "app.name").Return("goravel").Times(3)
s.mockConfig.On("GetString", "queue.connections.async.queue", "default").Return("default").Once()
s.mockConfig.On("GetString", "queue.connections.async.driver").Return("async").Once()
s.mockConfig.On("GetInt", "queue.connections.async.size", 100).Return(10).Once()
s.mockConfig.On("GetString", "queue.connections.redis.driver").Return("").Once()
s.mockConfig.On("GetString", "queue.failed.database").Return("database").Once()
s.mockConfig.On("GetString", "queue.failed.table").Return("failed_jobs").Once()

mockOrm := mocksorm.NewOrm(s.T())
mockQuery := mocksorm.NewQuery(s.T())
mockOrm.EXPECT().Connection("database").Return(mockOrm)
mockOrm.On("Query").Return(mockQuery)
mockQuery.On("Table", "failed_jobs").Return(mockQuery)
OrmFacade = mockOrm
s.mockConfig.EXPECT().GetString("queue.default").Return("async").Times(3)
s.mockConfig.EXPECT().GetString("app.name").Return("goravel").Times(3)
s.mockConfig.EXPECT().GetString("queue.connections.async.queue", "default").Return("default").Once()
s.mockConfig.EXPECT().GetString("queue.connections.async.driver").Return("async").Once()
s.mockConfig.EXPECT().GetInt("queue.connections.async.size", 100).Return(10).Once()
s.mockConfig.EXPECT().GetString("queue.connections.redis.driver").Return("").Twice()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand All @@ -161,13 +139,11 @@ func (s *DriverAsyncTestSuite) TestErrorAsyncQueue() {
}

func (s *DriverAsyncTestSuite) TestChainAsyncQueue() {
s.mockConfig.On("GetString", "queue.default").Return("async").Times(4)
s.mockConfig.On("GetString", "app.name").Return("goravel").Times(3)
s.mockConfig.On("GetString", "queue.connections.async.queue", "default").Return("default").Once()
s.mockConfig.On("GetString", "queue.connections.async.driver").Return("async").Twice()
s.mockConfig.On("GetInt", "queue.connections.async.size", 100).Return(10).Twice()
s.mockConfig.On("GetString", "queue.failed.database").Return("database").Once()
s.mockConfig.On("GetString", "queue.failed.table").Return("failed_jobs").Once()
s.mockConfig.EXPECT().GetString("queue.default").Return("async").Times(3)
s.mockConfig.EXPECT().GetString("app.name").Return("goravel").Times(3)
s.mockConfig.EXPECT().GetString("queue.connections.async.queue", "default").Return("default").Once()
s.mockConfig.EXPECT().GetString("queue.connections.async.driver").Return("async").Twice()
s.mockConfig.EXPECT().GetInt("queue.connections.async.size", 100).Return(10).Twice()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
13 changes: 7 additions & 6 deletions queue/driver_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,23 @@ func TestDriverSyncTestSuite(t *testing.T) {
}

func (s *DriverSyncTestSuite) SetupTest() {
s.mockConfig.On("GetString", "queue.default").Return("sync").Once()
s.mockConfig.On("GetString", "app.name").Return("goravel").Once()
s.mockConfig.On("GetString", "queue.connections.sync.queue", "default").Return("default").Once()
testSyncJob = 0
testChainSyncJob = 0
}

func (s *DriverSyncTestSuite) TestSyncQueue() {
s.mockConfig.EXPECT().GetString("queue.default").Return("sync").Twice()
s.mockConfig.EXPECT().GetString("app.name").Return("goravel").Once()
s.mockConfig.EXPECT().GetString("queue.connections.sync.queue", "default").Return("default").Once()
Comment on lines +45 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving these lines to SetupTest should be simpler, and these lines below should be moved to SetupTest as well:

mockConfig := mocksconfig.NewConfig(t)
mockQueue := mocksqueue.NewQueue(t)
app := NewApplication(mockConfig)

s.Nil(s.app.Job(&TestSyncJob{}, []any{"TestSyncQueue", 1}).DispatchSync())
s.Equal(1, testSyncJob)
}

func (s *DriverSyncTestSuite) TestChainSyncQueue() {
s.mockConfig.On("GetString", "queue.default").Return("sync").Times(2)
s.mockConfig.On("GetString", "app.name").Return("goravel").Once()
s.mockConfig.On("GetString", "queue.connections.sync.driver").Return("sync").Once()
s.mockConfig.EXPECT().GetString("queue.default").Return("sync").Twice()
s.mockConfig.EXPECT().GetString("app.name").Return("goravel").Twice()
s.mockConfig.EXPECT().GetString("queue.connections.sync.queue", "default").Return("default").Once()
s.mockConfig.EXPECT().GetString("queue.connections.sync.driver").Return("sync").Once()

s.Nil(s.app.Chain([]queue.Jobs{
{
Expand Down
12 changes: 3 additions & 9 deletions queue/worker.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some test cases for this file.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
job queue.JobRepository
queue string
wg sync.WaitGroup
failedWg sync.WaitGroup
}

func NewWorker(config queue.Config, concurrent int, connection string, queue string, job queue.JobRepository) *Worker {
Expand All @@ -43,7 +42,7 @@
return err
}
if driver.Driver() == queue.DriverSync {
return errors.QueueDriverSyncNotNeedRun.Args(r.queue)

Check warning on line 45 in queue/worker.go

View check run for this annotation

Codecov / codecov/patch

queue/worker.go#L45

Added line #L45 was not covered by tests
}

for i := 0; i < r.concurrent; i++ {
Expand All @@ -62,30 +61,26 @@
}

if err = r.job.Call(job.Signature(), args); err != nil {
select {
case r.failedJobChan <- FailedJob{
r.failedJobChan <- FailedJob{
UUID: uuid.New(),
Connection: r.connection,
Queue: r.queue,
Payload: args,
Exception: err.Error(),
FailedAt: carbon.DateTime{Carbon: carbon.Now()},
}:
default:
LogFacade.Error(errors.New("failed to send failed job to channel"))
}
}
}
}()
}

r.failedWg.Add(1)
r.wg.Add(1)
go func() {
defer r.failedWg.Done()
defer r.wg.Done()
for job := range r.failedJobChan {
if err = r.config.FailedJobsQuery().Create(&job); err != nil {
LogFacade.Error(errors.QueueFailedToSaveFailedJob.Args(err))
}

Check warning on line 83 in queue/worker.go

View check run for this annotation

Codecov / codecov/patch

queue/worker.go#L82-L83

Added lines #L82 - L83 were not covered by tests
}
}()

Expand All @@ -96,6 +91,5 @@
r.isShutdown.Store(true)
r.wg.Wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this line be moved to Run?

close(r.failedJobChan)
r.failedWg.Wait()
return nil
}
Loading
Loading