From 4a9d6841086abc5890483d1a4653882cabf11e06 Mon Sep 17 00:00:00 2001 From: Tine Date: Tue, 13 Feb 2024 21:52:10 +0100 Subject: [PATCH] feat: refactor temporal and make it work on fly --- cmd/server/main.go | 2 +- cmd/temporal/main.go | 124 ++++++---------------------------- example.env | 3 +- fly.toml | 6 +- go.mod | 5 +- go.sum | 4 -- internal/config.go | 19 ++++-- internal/database.go | 2 +- internal/handlers/temporal.go | 2 +- pkg/temporal/config.go | 7 +- pkg/temporal/server.go | 78 +++++++++++++++++++++ pkg/temporal/ui.go | 29 ++++++++ tools/generate/main.go | 2 +- 13 files changed, 161 insertions(+), 122 deletions(-) create mode 100644 pkg/temporal/server.go create mode 100644 pkg/temporal/ui.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 12ac5fd..2629fa0 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -16,7 +16,7 @@ func main() { r := mux.NewRouter() - db, query, err := internal.ConnectToDatabase(config.SQLITE_DB_PATH) + db, query, err := internal.ConnectToDatabase(config.ZDRAVKO_DATABASE_PATH) if err != nil { log.Fatal(err) } diff --git a/cmd/temporal/main.go b/cmd/temporal/main.go index 7861d14..0e88695 100644 --- a/cmd/temporal/main.go +++ b/cmd/temporal/main.go @@ -1,132 +1,52 @@ package main import ( - "context" "log" - "os" - "path/filepath" - - "go.temporal.io/server/common/config" - "go.temporal.io/server/schema/sqlite" - "go.temporal.io/server/temporal" + "code.tjo.space/mentos1386/zdravko/internal" t "code.tjo.space/mentos1386/zdravko/pkg/temporal" - - "go.temporal.io/server/common/authorization" - tlog "go.temporal.io/server/common/log" - - uiserver "github.com/temporalio/ui-server/v2/server" - uiconfig "github.com/temporalio/ui-server/v2/server/config" - uiserveroptions "github.com/temporalio/ui-server/v2/server/server_options" ) -func backendServer() { - cfg := t.NewConfig() - - logger := tlog.NewZapLogger(tlog.BuildZapLogger(tlog.Config{ - Stdout: true, - Level: "info", - OutputFile: "", - })) - - sqlConfig := cfg.Persistence.DataStores[t.PersistenceStoreName].SQL - - // Apply migrations if file does not already exist - if _, err := os.Stat(sqlConfig.DatabaseName); os.IsNotExist(err) { - // Check if any of the parent dirs are missing - dir := filepath.Dir(sqlConfig.DatabaseName) - if _, err := os.Stat(dir); err != nil { - log.Fatal(err) - } +func backendServer(config *internal.Config) { + serverConfig := t.NewServerConfig(config) - if err := sqlite.SetupSchema(sqlConfig); err != nil { - log.Fatal(err) - } - } - - // Pre-create namespaces - var namespaces []*sqlite.NamespaceConfig - for _, ns := range []string{"default"} { - namespaces = append(namespaces, sqlite.NewNamespaceConfig(cfg.ClusterMetadata.CurrentClusterName, ns, false)) - } - if err := sqlite.CreateNamespaces(sqlConfig, namespaces...); err != nil { - log.Fatal(err) + server, err := t.NewServer(serverConfig) + if err != nil { + log.Fatalf("Unable to create server: %v", err) } - authorizer, err := authorization.GetAuthorizerFromConfig(&cfg.Global.Authorization) + err = server.Start() if err != nil { - log.Fatal(err) + log.Fatalf("Unable to start server: %v", err) } - claimMapper, err := authorization.GetClaimMapperFromConfig(&cfg.Global.Authorization, logger) + err = server.Stop() if err != nil { - log.Fatal(err) + log.Fatalf("Unable to stop server: %v", err) } +} - ctx := context.Background() - interruptChan := make(chan interface{}, 1) - go func() { - if doneChan := ctx.Done(); doneChan != nil { - s := <-doneChan - interruptChan <- s - } else { - s := <-temporal.InterruptCh() - interruptChan <- s - } - }() +func frontendServer(config *internal.Config) { + uiConfig := t.NewUiConfig(config) - temporal, err := temporal.NewServer( - temporal.WithConfig(cfg), - temporal.ForServices(temporal.DefaultServices), - temporal.WithLogger(logger), - temporal.WithAuthorizer(authorizer), - temporal.WithClaimMapper(func(cfg *config.Config) authorization.ClaimMapper { - return claimMapper - }), - temporal.InterruptOn(interruptChan), - ) + uiServer, err := t.NewUiServer(uiConfig) if err != nil { - log.Fatal(err) + log.Fatalf("Unable to create UI server: %v", err) } - log.Println("Starting temporal server") - if err := temporal.Start(); err != nil { - panic(err) - } - err = temporal.Stop() + err = uiServer.Start() if err != nil { - panic(err) - } - -} - -func frontendServer() { - cfg := &uiconfig.Config{ - Host: "0.0.0.0", - Port: 8223, - TemporalGRPCAddress: "localhost:7233", - EnableUI: true, - PublicPath: "/temporal", - Codec: uiconfig.Codec{ - Endpoint: "", - }, - CORS: uiconfig.CORS{ - CookieInsecure: true, - }, + log.Fatalf("Unable to start UI server: %v", err) } - server := uiserver.NewServer(uiserveroptions.WithConfigProvider(cfg)) - - log.Println("Starting temporal ui server") - if err := server.Start(); err != nil { - panic(err) - } - server.Stop() + uiServer.Stop() } func main() { + config := internal.NewConfig() + go func() { - frontendServer() + frontendServer(config) }() - backendServer() + backendServer(config) } diff --git a/example.env b/example.env index 3b59ffb..a4e161e 100644 --- a/example.env +++ b/example.env @@ -3,7 +3,8 @@ PORT=8000 ROOT_URL=http://localhost:8000 # SQLite -SQLITE_DB_PATH=zdravko.db +ZDRAVKO_DATABASE_PATH=zdravko.db +TEMPORAL_DATABASE_PATH=temporal.db # Session SESSION_SECRET=your_secret diff --git a/fly.toml b/fly.toml index f87ae35..43f88cd 100644 --- a/fly.toml +++ b/fly.toml @@ -10,17 +10,19 @@ primary_region = 'waw' [env] PORT = '8080' ROOT_URL = 'https://zdravko.fly.dev' - SQLITE_DB_PATH = 'zdravko.db' # Other are defined in secrets - OAUTH2_SCOPES = 'openid,profile,email' OAUTH2_ENDPOINT_TOKEN_URL = 'https://id.tjo.space/application/o/token/' OAUTH2_ENDPOINT_AUTH_URL = 'https://id.tjo.space/application/o/authorize/' OAUTH2_ENDPOINT_USER_INFO_URL = 'https://id.tjo.space/application/o/userinfo/' OAUTH2_ENDPOINT_LOGOUT_URL = 'https://id.tjo.space/application/o/zdravko-development/end-session/' + TEMPORAL_UI_HOST = 'temporal.process.zdravko.internal:8223' + TEMPORAL_SERVER_HOST = 'temporal.process.zdravko.internal:7233' + [processes] server = "server" #worker = "worker" + temporal = "temporal" [http_service] processes = ["server"] diff --git a/go.mod b/go.mod index 4747aa4..45d82d3 100644 --- a/go.mod +++ b/go.mod @@ -3,13 +3,13 @@ module code.tjo.space/mentos1386/zdravko go 1.21.6 require ( - github.com/glebarez/sqlite v1.10.0 github.com/gorilla/mux v1.8.1 github.com/gorilla/sessions v1.2.2 github.com/temporalio/ui-server/v2 v2.23.0 go.temporal.io/sdk v1.26.0-rc.2 go.temporal.io/server v1.22.4 golang.org/x/oauth2 v0.17.0 + gorm.io/driver/sqlite v1.5.0 gorm.io/gen v0.3.25 gorm.io/gorm v1.25.7 gorm.io/plugin/dbresolver v1.5.0 @@ -39,7 +39,6 @@ require ( github.com/emirpasic/gods v1.18.1 // indirect github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/glebarez/go-sqlite v1.22.0 // indirect github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-sql-driver/mysql v1.7.1 // indirect @@ -75,6 +74,7 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-sqlite3 v1.14.16 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect github.com/olivere/elastic/v7 v7.0.32 // indirect @@ -131,6 +131,7 @@ require ( golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.17.0 // indirect + golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/api v0.155.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20240125205218-1f4bbc51befe // indirect diff --git a/go.sum b/go.sum index 50f9f5f..f0825de 100644 --- a/go.sum +++ b/go.sum @@ -85,10 +85,6 @@ github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/glebarez/go-sqlite v1.22.0 h1:uAcMJhaA6r3LHMTFgP0SifzgXg46yJkgxqyuyec+ruQ= -github.com/glebarez/go-sqlite v1.22.0/go.mod h1:PlBIdHe0+aUEFn+r2/uthrWq4FxbzugL0L8Li6yQJbc= -github.com/glebarez/sqlite v1.10.0 h1:u4gt8y7OND/cCei/NMHmfbLxF6xP2wgKcT/BJf2pYkc= -github.com/glebarez/sqlite v1.10.0/go.mod h1:IJ+lfSOmiekhQsFTJRx/lHtGYmCdtAiTaf5wI9u5uHA= github.com/go-faker/faker/v4 v4.2.0 h1:dGebOupKwssrODV51E0zbMrv5e2gO9VWSLNC1WDCpWg= github.com/go-faker/faker/v4 v4.2.0/go.mod h1:F/bBy8GH9NxOxMInug5Gx4WYeG6fHJZ8Ol/dhcpRub4= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= diff --git a/internal/config.go b/internal/config.go index 162864c..33ee858 100644 --- a/internal/config.go +++ b/internal/config.go @@ -9,8 +9,6 @@ type Config struct { PORT string ROOT_URL string // Needed for oauth2 redirect - SQLITE_DB_PATH string - SESSION_SECRET string OAUTH2_CLIENT_ID string @@ -20,6 +18,13 @@ type Config struct { OAUTH2_ENDPOINT_AUTH_URL string OAUTH2_ENDPOINT_USER_INFO_URL string OAUTH2_ENDPOINT_LOGOUT_URL string + + ZDRAVKO_DATABASE_PATH string + + TEMPORAL_DATABASE_PATH string + TEMPORAL_LISTEN_ADDRESS string + TEMPORAL_UI_HOST string + TEMPORAL_SERVER_HOST string } func getEnv(key, fallback string) string { @@ -41,15 +46,21 @@ func NewConfig() *Config { PORT: getEnv("PORT", "8000"), ROOT_URL: getEnvRequired("ROOT_URL"), - SQLITE_DB_PATH: getEnv("SQLITE_DB_PATH", "zdravko.db"), SESSION_SECRET: getEnvRequired("SESSION_SECRET"), OAUTH2_CLIENT_ID: getEnvRequired("OAUTH2_CLIENT_ID"), OAUTH2_CLIENT_SECRET: getEnvRequired("OAUTH2_CLIENT_SECRET"), - OAUTH2_SCOPES: strings.Split(getEnvRequired("OAUTH2_SCOPES"), ","), + OAUTH2_SCOPES: strings.Split(getEnv("OAUTH2_SCOPES", "openid,profile,email"), ","), OAUTH2_ENDPOINT_TOKEN_URL: getEnvRequired("OAUTH2_ENDPOINT_TOKEN_URL"), OAUTH2_ENDPOINT_AUTH_URL: getEnvRequired("OAUTH2_ENDPOINT_AUTH_URL"), OAUTH2_ENDPOINT_USER_INFO_URL: getEnvRequired("OAUTH2_ENDPOINT_USER_INFO_URL"), OAUTH2_ENDPOINT_LOGOUT_URL: getEnvRequired("OAUTH2_ENDPOINT_LOGOUT_URL"), + + ZDRAVKO_DATABASE_PATH: getEnv("ZDRAVKO_DATABASE_PATH", "zdravko.db"), + + TEMPORAL_DATABASE_PATH: getEnv("TEMPORAL_DATABASE_PATH", "temporal.db"), + TEMPORAL_LISTEN_ADDRESS: getEnv("TEMPORAL_LISTEN_ADDRESS", "0.0.0.0"), + TEMPORAL_UI_HOST: getEnv("TEMPORAL_UI_HOST", "localhost:8223"), + TEMPORAL_SERVER_HOST: getEnv("TEMPORAL_SERVER_HOST", "localhost:7233"), } } diff --git a/internal/database.go b/internal/database.go index c858554..e0627b5 100644 --- a/internal/database.go +++ b/internal/database.go @@ -3,7 +3,7 @@ package internal import ( "code.tjo.space/mentos1386/zdravko/internal/models" "code.tjo.space/mentos1386/zdravko/internal/models/query" - "github.com/glebarez/sqlite" + "gorm.io/driver/sqlite" "gorm.io/gorm" ) diff --git a/internal/handlers/temporal.go b/internal/handlers/temporal.go index 081b800..5da4938 100644 --- a/internal/handlers/temporal.go +++ b/internal/handlers/temporal.go @@ -10,7 +10,7 @@ var customTransport = http.DefaultTransport func (h *BaseHandler) Temporal(w http.ResponseWriter, r *http.Request, user *AuthenticatedUser) { // Create a new HTTP request with the same method, URL, and body as the original request targetURL := r.URL - targetURL.Host = "localhost:8223" + targetURL.Host = h.config.TEMPORAL_UI_HOST targetURL.Scheme = "http" proxyReq, err := http.NewRequest(r.Method, targetURL.String(), r.Body) diff --git a/pkg/temporal/config.go b/pkg/temporal/config.go index b0b629c..b15bb08 100644 --- a/pkg/temporal/config.go +++ b/pkg/temporal/config.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "code.tjo.space/mentos1386/zdravko/internal" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/config" "go.temporal.io/server/common/persistence/sql/sqlplugin/sqlite" @@ -19,7 +20,7 @@ const HistoryPort = 7234 const MatchingPort = 7235 const WorkerPort = 7236 -func NewConfig() *config.Config { +func NewServerConfig(cfg *internal.Config) *config.Config { return &config.Config{ Persistence: config.Persistence{ DataStores: map[string]config.DataStore{ @@ -28,7 +29,7 @@ func NewConfig() *config.Config { ConnectAttributes: map[string]string{ "mode": "rwc", }, - DatabaseName: "temporal.db", + DatabaseName: cfg.TEMPORAL_DATABASE_PATH, }, }, }, @@ -49,7 +50,7 @@ func NewConfig() *config.Config { GRPCPort: FrontendPort, MembershipPort: FrontendPort + 100, BindOnLocalHost: false, - BindOnIP: "0.0.0.0", + BindOnIP: cfg.TEMPORAL_LISTEN_ADDRESS, }, }, "history": { diff --git a/pkg/temporal/server.go b/pkg/temporal/server.go new file mode 100644 index 0000000..aec59de --- /dev/null +++ b/pkg/temporal/server.go @@ -0,0 +1,78 @@ +package temporal + +import ( + "context" + "os" + "path/filepath" + + "go.temporal.io/server/common/authorization" + "go.temporal.io/server/common/config" + "go.temporal.io/server/common/log" + "go.temporal.io/server/schema/sqlite" + t "go.temporal.io/server/temporal" +) + +func NewServer(cfg *config.Config) (t.Server, error) { + logger := log.NewZapLogger(log.BuildZapLogger(log.Config{ + Stdout: true, + Level: "info", + OutputFile: "", + })) + + sqlConfig := cfg.Persistence.DataStores[PersistenceStoreName].SQL + + // Apply migrations if file does not already exist + if _, err := os.Stat(sqlConfig.DatabaseName); os.IsNotExist(err) { + // Check if any of the parent dirs are missing + dir := filepath.Dir(sqlConfig.DatabaseName) + if _, err := os.Stat(dir); err != nil { + return nil, err + } + + if err := sqlite.SetupSchema(sqlConfig); err != nil { + return nil, err + } + } + + // Pre-create namespaces + var namespaces []*sqlite.NamespaceConfig + for _, ns := range []string{"default"} { + namespaces = append(namespaces, sqlite.NewNamespaceConfig(cfg.ClusterMetadata.CurrentClusterName, ns, false)) + } + if err := sqlite.CreateNamespaces(sqlConfig, namespaces...); err != nil { + return nil, err + } + + authorizer, err := authorization.GetAuthorizerFromConfig(&cfg.Global.Authorization) + if err != nil { + return nil, err + } + + claimMapper, err := authorization.GetClaimMapperFromConfig(&cfg.Global.Authorization, logger) + if err != nil { + return nil, err + } + + ctx := context.Background() + interruptChan := make(chan interface{}, 1) + go func() { + if doneChan := ctx.Done(); doneChan != nil { + s := <-doneChan + interruptChan <- s + } else { + s := <-t.InterruptCh() + interruptChan <- s + } + }() + + return t.NewServer( + t.WithConfig(cfg), + t.ForServices(t.DefaultServices), + t.WithLogger(logger), + t.WithAuthorizer(authorizer), + t.WithClaimMapper(func(cfg *config.Config) authorization.ClaimMapper { + return claimMapper + }), + t.InterruptOn(interruptChan), + ) +} diff --git a/pkg/temporal/ui.go b/pkg/temporal/ui.go new file mode 100644 index 0000000..938e8f3 --- /dev/null +++ b/pkg/temporal/ui.go @@ -0,0 +1,29 @@ +package temporal + +import ( + "code.tjo.space/mentos1386/zdravko/internal" + "github.com/temporalio/ui-server/v2/server" + "github.com/temporalio/ui-server/v2/server/config" + "github.com/temporalio/ui-server/v2/server/server_options" +) + +func NewUiConfig(cfg *internal.Config) *config.Config { + return &config.Config{ + Host: cfg.TEMPORAL_LISTEN_ADDRESS, + Port: 8223, + TemporalGRPCAddress: cfg.TEMPORAL_SERVER_HOST, + EnableUI: true, + PublicPath: "/temporal", + Codec: config.Codec{ + Endpoint: "", + }, + CORS: config.CORS{ + CookieInsecure: true, + }, + } +} + +func NewUiServer(cfg *config.Config) (*server.Server, error) { + s := server.NewServer(server_options.WithConfigProvider(cfg)) + return s, nil +} diff --git a/tools/generate/main.go b/tools/generate/main.go index 35b0868..e1b3b9e 100644 --- a/tools/generate/main.go +++ b/tools/generate/main.go @@ -16,7 +16,7 @@ func main() { FieldNullable: true, }) - db, _, _ := internal.ConnectToDatabase(config.SQLITE_DB_PATH) + db, _, _ := internal.ConnectToDatabase(config.ZDRAVKO_DATABASE_PATH) // Use the above `*gorm.DB` instance to initialize the generator, // which is required to generate structs from db when using `GenerateModel/GenerateModelAs`