Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(raft): Implement Raft-based Consistent Hash State Management #636

Merged
merged 26 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d8cd37e
feat: Add Raft consensus for consistent hashing
sinadarbouy Nov 7, 2024
ff2be24
refactor: update consistent hash implementation with block-based prox…
sinadarbouy Nov 24, 2024
e35be54
refactor: remove proxy ID and related functionality
sinadarbouy Nov 25, 2024
61c2ac8
feat(raft): Add Raft integration tests and consistent hash improvements
sinadarbouy Nov 25, 2024
3ed59e5
feat(raft): add configurable directory and improve test stability
sinadarbouy Nov 26, 2024
2ab8041
feat(config): add default Raft configuration values
sinadarbouy Nov 28, 2024
809530f
refactor(raft): improve error handling and code organization
sinadarbouy Nov 28, 2024
071e84c
Add temporary directory for Raft in Test_pluginScaffoldCmd
sinadarbouy Nov 28, 2024
c5d2dbe
feat(config): add JSON parsing for raft peers env variable
sinadarbouy Dec 2, 2024
0c7ce5c
Add GRPC to raft
sinadarbouy Dec 6, 2024
fb36926
feat: add Docker Compose configuration for Raft cluster setup
sinadarbouy Dec 8, 2024
8ba90d2
refactor(raft): improve error handling and code clarity
sinadarbouy Dec 8, 2024
9e15e11
Add unit tests for Raft RPC server and client
sinadarbouy Dec 8, 2024
d3042bc
Update Raft configuration in gatewayd.yaml
sinadarbouy Dec 8, 2024
bd458bb
Convert RaftPeer slice to string for environment variable compatibility
sinadarbouy Dec 9, 2024
72230cd
Update checksum in gatewayd_plugins.yaml
sinadarbouy Dec 9, 2024
09e31e5
Refactor Raft configuration to use `IsBootstrap` flag
sinadarbouy Dec 9, 2024
f395f0c
Increase the sleep time to pass the test case on the local machine.
sinadarbouy Dec 9, 2024
2068618
fix: resolve lint issues in rpc_test.go
sinadarbouy Dec 9, 2024
01c2234
feat: Improve code readability with comments and updates
sinadarbouy Dec 9, 2024
5d0c65f
Improve Redis container setup and async test handling
sinadarbouy Dec 9, 2024
61945e3
Handle Fatal Error on Raft Node Initialization Failure
sinadarbouy Dec 13, 2024
ef3745b
Update test configuration in gatewayd.yaml
sinadarbouy Dec 13, 2024
d6f286b
Update comment to accurately describe Raft configuration constants
sinadarbouy Dec 13, 2024
586efd7
Simplify leader check in monitorLeadership function
sinadarbouy Dec 13, 2024
f6aba9f
Fix: Gracefully handle ErrRaftShutdown during Node shutdown
sinadarbouy Dec 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ linters-settings:
- "github.com/testcontainers/testcontainers-go"
- "github.com/stretchr/testify/require"
- "github.com/docker/go-connections/nat"
- "github.com/hashicorp/raft"
test:
files:
- $test
Expand All @@ -92,6 +93,7 @@ linters-settings:
- "github.com/redis/go-redis/v9"
- "github.com/docker/go-connections/nat"
- "github.com/codingsince1985/checksum"
- "github.com/hashicorp/raft"
tagalign:
align: false
sort: false
Expand Down
5 changes: 4 additions & 1 deletion act/act_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ func createTestRedis(t *testing.T) string {
req := testcontainers.ContainerRequest{
Image: "redis:6",
ExposedPorts: []string{"6379/tcp"},
WaitingFor: wait.ForLog("Ready to accept connections"),
WaitingFor: wait.ForAll(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

wait.ForLog("Ready to accept connections"),
wait.ForListeningPort("6379/tcp"),
),
}
redisContainer, err := testcontainers.GenericContainer(
ctx, testcontainers.GenericContainerRequest{
Expand Down
10 changes: 6 additions & 4 deletions act/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,10 +747,12 @@ func Test_Run_Async_Redis(t *testing.T) {
consumer, err := sdkAct.NewConsumer(hclogger, rdb, 5, "test-async-chan")
require.NoError(t, err)

require.NoError(t, consumer.Subscribe(context.Background(), func(ctx context.Context, task []byte) error {
err := actRegistry.runAsyncActionFn(ctx, task)
waitGroup.Done()
return err
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

require.NoError(t, consumer.Subscribe(ctx, func(ctx context.Context, task []byte) error {
defer waitGroup.Done()
return actRegistry.runAsyncActionFn(ctx, task)
}))

outputs := actRegistry.Apply([]sdkAct.Signal{
Expand Down
3 changes: 3 additions & 0 deletions cmd/plugin_scaffold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func Test_pluginScaffoldCmd(t *testing.T) {
postgresAddress2 := postgresHostIP2 + ":" + postgresMappedPort2.Port()
t.Setenv("GATEWAYD_CLIENTS_TEST_WRITE_ADDRESS", postgresAddress2)

raftTempDir := t.TempDir()
t.Setenv("GATEWAYD_RAFT_DIRECTORY", raftTempDir)

globalTestConfigFile := filepath.Join("testdata", "gatewayd.yaml")
plugin.IsPluginTemplateEmbedded()
pluginTestScaffoldInputFile := "./testdata/scaffold_input.yaml"
Expand Down
13 changes: 13 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/gatewayd-io/gatewayd/network"
"github.com/gatewayd-io/gatewayd/plugin"
"github.com/gatewayd-io/gatewayd/pool"
"github.com/gatewayd-io/gatewayd/raft"
"github.com/gatewayd-io/gatewayd/tracing"
usage "github.com/gatewayd-io/gatewayd/usagereport/v1"
"github.com/getsentry/sentry-go"
Expand Down Expand Up @@ -910,6 +911,17 @@ var runCmd = &cobra.Command{

span.End()

_, span = otel.Tracer(config.TracerName).Start(runCtx, "Create Raft Node")
defer span.End()

raftNode, originalErr := raft.NewRaftNode(logger, conf.Global.Raft)
if originalErr != nil {
mostafa marked this conversation as resolved.
Show resolved Hide resolved
logger.Error().Err(originalErr).Msg("Failed to start raft node")
span.RecordError(originalErr)
pluginRegistry.Shutdown()
os.Exit(gerr.FailedToStartRaftNode)
}

_, span = otel.Tracer(config.TracerName).Start(runCtx, "Create servers")
// Create and initialize servers.
for name, cfg := range conf.Global.Servers {
Expand Down Expand Up @@ -946,6 +958,7 @@ var runCmd = &cobra.Command{
LoadbalancerStrategyName: cfg.LoadBalancer.Strategy,
LoadbalancerRules: cfg.LoadBalancer.LoadBalancingRules,
LoadbalancerConsistentHash: cfg.LoadBalancer.ConsistentHash,
RaftNode: raftNode,
},
)

Expand Down
20 changes: 20 additions & 0 deletions cmd/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ func Test_runCmd(t *testing.T) {
postgresAddress := postgresHostIP + ":" + postgresMappedPort.Port()
t.Setenv("GATEWAYD_CLIENTS_DEFAULT_WRITES_ADDRESS", postgresAddress)

tempDir := t.TempDir()
t.Setenv("GATEWAYD_RAFT_DIRECTORY", tempDir)
t.Setenv("GATEWAYD_RAFT_ADDRESS", "127.0.0.1:0")
t.Setenv("GATEWAYD_RAFT_GRPCADDRESS", "127.0.0.1:0")

globalTestConfigFile := "./test_global_runCmd.yaml"
pluginTestConfigFile := "./test_plugins_runCmd.yaml"
// Create a test plugins config file.
Expand Down Expand Up @@ -87,6 +92,11 @@ func Test_runCmdWithTLS(t *testing.T) {
postgresAddress := postgresHostIP + ":" + postgresMappedPort.Port()
t.Setenv("GATEWAYD_CLIENTS_DEFAULT_WRITES_ADDRESS", postgresAddress)

tempDir := t.TempDir()
t.Setenv("GATEWAYD_RAFT_DIRECTORY", tempDir)
t.Setenv("GATEWAYD_RAFT_ADDRESS", "127.0.0.1:0")
t.Setenv("GATEWAYD_RAFT_GRPCADDRESS", "127.0.0.1:0")

globalTLSTestConfigFile := "./testdata/gatewayd_tls.yaml"
pluginTestConfigFile := "./test_plugins_runCmdWithTLS.yaml"
// Create a test plugins config file.
Expand Down Expand Up @@ -150,6 +160,11 @@ func Test_runCmdWithMultiTenancy(t *testing.T) {
postgresAddress2 := postgresHostIP2 + ":" + postgresMappedPort2.Port()
t.Setenv("GATEWAYD_CLIENTS_TEST_WRITE_ADDRESS", postgresAddress2)

tempDir := t.TempDir()
t.Setenv("GATEWAYD_RAFT_DIRECTORY", tempDir)
t.Setenv("GATEWAYD_RAFT_ADDRESS", "127.0.0.1:0")
t.Setenv("GATEWAYD_RAFT_GRPCADDRESS", "127.0.0.1:0")

globalTestConfigFile := "./testdata/gatewayd.yaml"
pluginTestConfigFile := "./test_plugins_runCmdWithMultiTenancy.yaml"
// Create a test plugins config file.
Expand Down Expand Up @@ -211,6 +226,11 @@ func Test_runCmdWithCachePlugin(t *testing.T) {
postgresAddress := postgresHostIP + ":" + postgresMappedPort.Port()
t.Setenv("GATEWAYD_CLIENTS_DEFAULT_WRITES_ADDRESS", postgresAddress)

tempDir := t.TempDir()
t.Setenv("GATEWAYD_RAFT_DIRECTORY", tempDir)
t.Setenv("GATEWAYD_RAFT_ADDRESS", "127.0.0.1:0")
t.Setenv("GATEWAYD_RAFT_GRPCADDRESS", "127.0.0.1:0")

globalTestConfigFile := "./test_global_runCmdWithCachePlugin.yaml"
pluginTestConfigFile := "./test_plugins_runCmdWithCachePlugin.yaml"
// TODO: Remove this once these global variables are removed from cmd/run.go.
Expand Down
6 changes: 6 additions & 0 deletions cmd/testdata/gatewayd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,9 @@ servers:

api:
enabled: True

raft:
address: 127.0.0.1:2222
nodeID: node1
isBootstrap: true
peers: {}
82 changes: 50 additions & 32 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ func (c *Config) LoadDefaults(ctx context.Context) *gerr.GatewayDError {
GRPCNetwork: DefaultGRPCAPINetwork,
GRPCAddress: DefaultGRPCAPIAddress,
},
Raft: Raft{
Address: DefaultRaftAddress,
NodeID: DefaultRaftNodeID,
IsBootstrap: DefaultRaftIsBootstrap,
Directory: DefaultRaftDirectory,
GRPCAddress: DefaultRaftGRPCAddress,
},
}

//nolint:nestif
Expand All @@ -201,7 +208,7 @@ func (c *Config) LoadDefaults(ctx context.Context) *gerr.GatewayDError {
return gerr.ErrConfigParseError.Wrap(err)
}

if configObject == "api" {
if configObject == "api" || configObject == "raft" {
// Handle API configuration separately
// TODO: Add support for multiple API config groups.
continue
Expand Down Expand Up @@ -309,7 +316,7 @@ func (c *Config) LoadDefaults(ctx context.Context) *gerr.GatewayDError {
func (c *Config) LoadGlobalEnvVars(ctx context.Context) *gerr.GatewayDError {
_, span := otel.Tracer(TracerName).Start(ctx, "Load global environment variables")

if err := c.GlobalKoanf.Load(loadEnvVars(), nil); err != nil {
if err := c.GlobalKoanf.Load(loadEnvVarsWithTransform(), nil); err != nil {
span.RecordError(err)
span.End()
return gerr.ErrConfigParseError.Wrap(
Expand All @@ -326,7 +333,7 @@ func (c *Config) LoadGlobalEnvVars(ctx context.Context) *gerr.GatewayDError {
func (c *Config) LoadPluginEnvVars(ctx context.Context) *gerr.GatewayDError {
_, span := otel.Tracer(TracerName).Start(ctx, "Load plugin environment variables")

if err := c.PluginKoanf.Load(loadEnvVars(), nil); err != nil {
if err := c.PluginKoanf.Load(loadEnvVarsWithTransform(), nil); err != nil {
span.RecordError(err)
span.End()
return gerr.ErrConfigParseError.Wrap(
Expand All @@ -338,41 +345,52 @@ func (c *Config) LoadPluginEnvVars(ctx context.Context) *gerr.GatewayDError {
return nil
}

func loadEnvVars() *env.Env {
return env.Provider(EnvPrefix, ".", transformEnvVariable)
}

// transformEnvVariable transforms the environment variable name to a format based on JSON tags.
func transformEnvVariable(envVar string) string {
structs := []any{
&API{},
&Logger{},
&Pool{},
&Proxy{},
&Server{},
&Metrics{},
&PluginConfig{},
}
tagMapping := make(map[string]string)
generateTagMapping(structs, tagMapping)
func loadEnvVarsWithTransform() *env.Env {
// Use ProviderWithValue to transform both key and value
return env.ProviderWithValue(EnvPrefix, ".", func(envKey string, value string) (string, interface{}) {
// Transform the key
key := strings.ToLower(strings.TrimPrefix(envKey, EnvPrefix))

structs := []any{
&API{},
&Logger{},
&Pool{},
&Proxy{},
&Server{},
&Metrics{},
&PluginConfig{},
&Raft{},
}
tagMapping := make(map[string]string)
generateTagMapping(structs, tagMapping)

lowerEnvVar := strings.ToLower(strings.TrimPrefix(envVar, EnvPrefix))
parts := strings.Split(lowerEnvVar, "_")
parts := strings.Split(key, "_")

var transformedParts strings.Builder
var transformedParts strings.Builder

for i, part := range parts {
if i > 0 {
transformedParts.WriteString(".")
for i, part := range parts {
if i > 0 {
transformedParts.WriteString(".")
}
if mappedValue, exists := tagMapping[part]; exists {
transformedParts.WriteString(mappedValue)
} else {
transformedParts.WriteString(part)
}
}
if mappedValue, exists := tagMapping[part]; exists {
transformedParts.WriteString(mappedValue)
} else {
transformedParts.WriteString(part)

// Check if the key is "peers" and transform the value using JSON unmarshal
if transformedParts.String() == "raft.peers" {
var raftPeers []RaftPeer
if err := json.Unmarshal([]byte(value), &raftPeers); err != nil {
return transformedParts.String(), fmt.Errorf("failed to unmarshal peers: %w", err)
}
return transformedParts.String(), raftPeers
}
}

return transformedParts.String()
// Return the key and value as is if no transformation is needed
return transformedParts.String(), value
})
}

// LoadGlobalConfigFile loads the plugin configuration file.
Expand Down
7 changes: 7 additions & 0 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ const (
DefaultActionRedisEnabled = false
DefaultRedisAddress = "localhost:6379"
DefaultRedisChannel = "gatewayd-actions"

// Raft constants.
DefaultRaftAddress = "127.0.0.1:2223"
DefaultRaftNodeID = "node1"
DefaultRaftIsBootstrap = true
DefaultRaftDirectory = "raft"
DefaultRaftGRPCAddress = "127.0.0.1:50051"
)

// Load balancing strategies.
Expand Down
16 changes: 16 additions & 0 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,21 @@ type API struct {
GRPCNetwork string `json:"grpcNetwork" jsonschema:"enum=tcp,enum=udp,enum=unix"`
}

type Raft struct {
Address string `json:"address"`
NodeID string `json:"nodeId"`
IsBootstrap bool `json:"isBootstrap"`
Peers []RaftPeer `json:"peers"`
Directory string `json:"directory" jsonschema:"default=raft"`
GRPCAddress string `json:"grpcAddress"`
}

type RaftPeer struct {
ID string `json:"id"`
Address string `json:"address"`
GRPCAddress string `json:"grpcAddress"`
}

type GlobalConfig struct {
API API `json:"api"`
Loggers map[string]*Logger `json:"loggers"`
Expand All @@ -146,4 +161,5 @@ type GlobalConfig struct {
Proxies map[string]map[string]*Proxy `json:"proxies"`
Servers map[string]*Server `json:"servers"`
Metrics map[string]*Metrics `json:"metrics"`
Raft Raft `json:"raft"`
}
Loading
Loading