-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
new mysql storage backend for workflow engine (#33)
- Loading branch information
1 parent
fe46327
commit 3f97c32
Showing
28 changed files
with
2,362 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package mysql | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/micromdm/nanocmd/engine/storage" | ||
"github.com/micromdm/nanocmd/workflow" | ||
) | ||
|
||
// RetrieveEventSubscriptions retrieves event subscriptions by names. | ||
// See the storage interface type for further docs. | ||
func (s *MySQLStorage) RetrieveEventSubscriptions(ctx context.Context, names []string) (map[string]*storage.EventSubscription, error) { | ||
events, err := s.q.GetEventsByNames(ctx, names) | ||
if err != nil { | ||
return nil, fmt.Errorf("get events by name: %w", err) | ||
} | ||
retEvents := make(map[string]*storage.EventSubscription) | ||
for _, event := range events { | ||
retEvents[event.EventName] = &storage.EventSubscription{ | ||
Event: event.EventType, | ||
Workflow: event.WorkflowName, | ||
Context: event.Context.String, | ||
} | ||
} | ||
return retEvents, nil | ||
} | ||
|
||
// RetrieveEventSubscriptionsByEvent retrieves event subscriptions by event flag. | ||
// See the storage interface type for further docs. | ||
func (s *MySQLStorage) RetrieveEventSubscriptionsByEvent(ctx context.Context, f workflow.EventFlag) ([]*storage.EventSubscription, error) { | ||
events, err := s.q.GetEventsByType(ctx, f.String()) | ||
if err != nil { | ||
return nil, fmt.Errorf("get events by type: %w", err) | ||
} | ||
var retEvents []*storage.EventSubscription | ||
for _, event := range events { | ||
retEvents = append(retEvents, &storage.EventSubscription{ | ||
Event: event.EventType, | ||
Workflow: event.WorkflowName, | ||
Context: event.Context.String, | ||
}) | ||
} | ||
return retEvents, nil | ||
} | ||
|
||
// StoreEventSubscription stores an event subscription. | ||
// See the storage interface type for further docs. | ||
func (s *MySQLStorage) StoreEventSubscription(ctx context.Context, name string, es *storage.EventSubscription) error { | ||
_, err := s.db.ExecContext( | ||
ctx, | ||
` | ||
INSERT INTO wf_events | ||
(event_name, event_type, workflow_name, context) | ||
VALUES | ||
(?, ?, ?, ?) AS new | ||
ON DUPLICATE KEY | ||
UPDATE | ||
workflow_name = new.workflow_name, | ||
event_type = new.event_type, | ||
context = new.context;`, | ||
name, | ||
es.Event, | ||
es.Workflow, | ||
sqlNullString(es.Context), | ||
) | ||
return err | ||
} | ||
|
||
// DeleteEventSubscription removes an event subscription. | ||
// See the storage interface type for further docs. | ||
func (s *MySQLStorage) DeleteEventSubscription(ctx context.Context, name string) error { | ||
return s.q.RemoveEvent(ctx, name) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
package mysql | ||
|
||
//go:generate sqlc generate |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
package mysql | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"fmt" | ||
"math/rand" | ||
"sync" | ||
"time" | ||
|
||
"github.com/micromdm/nanocmd/engine/storage/mysql/sqlc" | ||
) | ||
|
||
// MySQLStorage implements a storage.AllStorage using MySQL. | ||
type MySQLStorage struct { | ||
db *sql.DB | ||
q *sqlc.Queries | ||
|
||
randMu sync.Mutex | ||
rand *rand.Rand | ||
} | ||
|
||
type config struct { | ||
driver string | ||
dsn string | ||
db *sql.DB | ||
} | ||
|
||
// Option allows configuring a MySQLStorage. | ||
type Option func(*config) | ||
|
||
// WithDSN sets the storage MySQL data source name. | ||
func WithDSN(dsn string) Option { | ||
return func(c *config) { | ||
c.dsn = dsn | ||
} | ||
} | ||
|
||
// WithDriver sets a custom MySQL driver for the storage. | ||
// Default driver is "mysql" but is ignored if WithDB is used. | ||
func WithDriver(driver string) Option { | ||
return func(c *config) { | ||
c.driver = driver | ||
} | ||
} | ||
|
||
// WithDB sets a custom MySQL *sql.DB to the storage. | ||
// If set, driver passed via WithDriver is ignored. | ||
func WithDB(db *sql.DB) Option { | ||
return func(c *config) { | ||
c.db = db | ||
} | ||
} | ||
|
||
// New creates and returns a new MySQL. | ||
func New(opts ...Option) (*MySQLStorage, error) { | ||
cfg := &config{driver: "mysql"} | ||
for _, opt := range opts { | ||
opt(cfg) | ||
} | ||
var err error | ||
if cfg.db == nil { | ||
cfg.db, err = sql.Open(cfg.driver, cfg.dsn) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
if err = cfg.db.Ping(); err != nil { | ||
return nil, err | ||
} | ||
return &MySQLStorage{ | ||
db: cfg.db, | ||
q: sqlc.New(cfg.db), | ||
rand: rand.New(rand.NewSource(time.Now().UnixNano())), | ||
}, nil | ||
} | ||
|
||
// sqlNullString sets Valid to true of the return value of s is not empty. | ||
func sqlNullString(s string) sql.NullString { | ||
return sql.NullString{String: s, Valid: s != ""} | ||
} | ||
|
||
// sqlNullTime sets Valid to true of the return value of t is not zero. | ||
func sqlNullTime(t time.Time) sql.NullTime { | ||
return sql.NullTime{Valid: !t.IsZero(), Time: t} | ||
} | ||
|
||
// txcb executes SQL within transactions when wrapped in tx(). | ||
type txcb func(ctx context.Context, tx *sql.Tx, qtx *sqlc.Queries) error | ||
|
||
// tx wraps g in transactions using db. | ||
// If g returns an err the transaction will be rolled back; otherwise committed. | ||
func tx(ctx context.Context, db *sql.DB, q *sqlc.Queries, g txcb) error { | ||
tx, err := db.BeginTx(ctx, nil) | ||
if err != nil { | ||
return fmt.Errorf("tx begin: %w", err) | ||
} | ||
if err = g(ctx, tx, q.WithTx(tx)); err != nil { | ||
if rbErr := tx.Rollback(); rbErr != nil { | ||
return fmt.Errorf("tx rollback: %w; while trying to handle error: %v", rbErr, err) | ||
} | ||
return fmt.Errorf("tx rolled back: %w", err) | ||
} | ||
if err = tx.Commit(); err != nil { | ||
return fmt.Errorf("tx commit: %w", err) | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package mysql | ||
|
||
import ( | ||
"os" | ||
"testing" | ||
|
||
"github.com/micromdm/nanocmd/engine/storage" | ||
"github.com/micromdm/nanocmd/engine/storage/test" | ||
|
||
_ "github.com/go-sql-driver/mysql" | ||
) | ||
|
||
func TestMySQLStorage(t *testing.T) { | ||
testDSN := os.Getenv("NANOCMD_MYSQL_STORAGE_TEST_DSN") | ||
if testDSN == "" { | ||
t.Skip("NANOCMD_MYSQL_STORAGE_TEST_DSN not set") | ||
} | ||
|
||
s, err := New(WithDSN(testDSN)) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
test.TestEngineStorage(t, func() storage.AllStorage { return s }) | ||
} |
Oops, something went wrong.