Skip to content

Commit

Permalink
set-up-index-tracking-for-backfill-index
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonPowr committed Nov 20, 2024
1 parent 56ea4b5 commit ca1c83e
Showing 1 changed file with 61 additions and 2 deletions.
63 changes: 61 additions & 2 deletions cmd/backfill-index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"log"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -90,6 +91,10 @@ const (
providerMySQL
)

const (
redisLastProcessedIndexKey = "last_processed_index"
)

type indexClient interface {
idempotentAddToIndex(ctx context.Context, key, value string) error
}
Expand Down Expand Up @@ -127,6 +132,7 @@ var (
redisPassword = flag.String("redis-password", "", "Password for Redis authentication")
redisEnableTLS = flag.Bool("redis-enable-tls", false, "Enable TLS for Redis client")
redisInsecureSkipVerify = flag.Bool("redis-insecure-skip-verify", false, "Whether to skip TLS verification for Redis client or not")
enableRedisIndexResume = flag.Bool("enable-redis-index-resume", false, "Enable resuming from the last processed index stored in Redis. When enabled, the '--start' flag becomes optional for the Redis provider.")
mysqlDSN = flag.String("mysql-dsn", "", "MySQL Data Source Name")
startIndex = flag.Int("start", -1, "First index to backfill")
endIndex = flag.Int("end", -1, "Last index to backfill")
Expand Down Expand Up @@ -158,7 +164,7 @@ func main() {
if *mysqlDSN != "" {
provider = providerMySQL
}
if *redisHostname != "" || *redisPort != "" || *redisPassword != "" {
if *redisHostname != "" || *redisPort != "" || *redisPassword != "" || enableRedisIndexResume != nil {
provider = providerRedis
}
if provider == providerUnset {
Expand All @@ -172,7 +178,10 @@ func main() {
log.Fatal("Redis port must be set")
}
}
if *startIndex == -1 {
if *enableRedisIndexResume && *startIndex != -1 {
log.Fatal("--enable-redis-index-resume and --start cannot be set simultaneously")
}
if *startIndex == -1 && !*enableRedisIndexResume {
log.Fatal("start must be set to >=0")
}
if *endIndex == -1 {
Expand Down Expand Up @@ -276,6 +285,24 @@ func populate(indexClient indexClient, rekorClient *rekorclient.Rekor) (err erro
}
}()

var lastFilled int
if *enableRedisIndexResume && !*dryRun {
redisClient, ok := indexClient.(*redisClient)
if !ok {
log.Fatal("enableRedisIndexResume is only supported with Redis backend")
}
lastFilled, err = redisClient.getLastFilledIndex(ctx)
if err != nil {
log.Fatalf("Failed to retrieve last filled index: %v", err)
}
if lastFilled == -1 {
log.Printf("%s not found, starting from index 0", redisLastProcessedIndexKey)
*startIndex = 0
} else {
*startIndex = lastFilled + 1 // Start from the next index
}
}

for i := *startIndex; i <= *endIndex; i++ {
index := i // capture loop variable for closure
group.Go(func() error {
Expand Down Expand Up @@ -345,6 +372,19 @@ func populate(indexClient indexClient, rekorClient *rekorclient.Rekor) (err erro
return nil
})
}

if *enableRedisIndexResume && !*dryRun {
redisClient, ok := indexClient.(*redisClient)
if !ok {
log.Fatal("enableRedisIndexResume is only supported with Redis backend")
}
if err := redisClient.setLastFilledIndex(ctx, *endIndex); err != nil {
log.Printf("Failed to set last filled index: %v", err)
} else {
fmt.Printf("Last filled index updated to %d\n", *endIndex)
}
}

err = group.Wait()
if err != nil {
return fmt.Errorf("error running backfill: %v", err)
Expand Down Expand Up @@ -393,3 +433,22 @@ func (c *mysqlClient) idempotentAddToIndex(ctx context.Context, key, value strin
_, err := c.client.NamedExecContext(ctx, mysqlWriteStmt, map[string]any{"key": key, "uuid": value})
return err
}

func (c *redisClient) getLastFilledIndex(ctx context.Context) (int, error) {
val, err := c.client.Get(ctx, redisLastProcessedIndexKey).Result()
if err != nil {
if err == redis.Nil {
return -1, nil // No index has been filled yet
}
return 0, fmt.Errorf("failed to get last filled index from Redis: %w", err)
}
index, err := strconv.Atoi(val)
if err != nil {
return 0, fmt.Errorf("invalid last filled index value in Redis: %w", err)
}
return index, nil
}

func (c *redisClient) setLastFilledIndex(ctx context.Context, index int) error {
return c.client.Set(ctx, redisLastProcessedIndexKey, index, 0).Err()
}

0 comments on commit ca1c83e

Please sign in to comment.