diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index 45a5c173..66f09d1a 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -1054,7 +1054,7 @@ func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, er WithSourceConfig(sc), WithPG(pgp), WithRange(sc.Start, sc.Stop), - WithConcurrency(1, 100), + WithConcurrency(max(1, sc.Concurrenty), max(1, sc.BatchSize)), WithDestinations(dests[sc.Name]...), )) if len(destsBF[sc.Name]) == 0 { @@ -1065,7 +1065,7 @@ func loadTasks(ctx context.Context, pgp *pgxpool.Pool, conf Config) ([]*Task, er WithSourceConfig(sc), WithPG(pgp), WithRange(startBF[sc.Name], 0), - WithConcurrency(10, 100), + WithConcurrency(max(1, sc.Concurrenty), max(1, sc.BatchSize)), WithDestinations(destsBF[sc.Name]...), )) } @@ -1146,11 +1146,13 @@ func SourceConfigs(ctx context.Context, pgp *pgxpool.Pool) ([]SourceConfig, erro } type SourceConfig struct { - Name string `json:"name"` - ChainID uint64 `json:"chain_id"` - URL string `json:"url"` - Start uint64 `json:"start"` - Stop uint64 `json:"stop"` + Name string `json:"name"` + ChainID uint64 `json:"chain_id"` + URL string `json:"url"` + Start uint64 `json:"start"` + Stop uint64 `json:"stop"` + Concurrenty int `json:"concurrency"` + BatchSize int `json:"batch_size"` } type Compiled struct {