From 9cc59462a80aa11d4320f395088eb1ea5865ab8b Mon Sep 17 00:00:00 2001 From: minhnhathoang Date: Thu, 2 Jan 2025 06:06:34 +0700 Subject: [PATCH] refactor: add interceptors for http.RoundTripper and custom graphql.Client --- .../algebra/integral/pool_list_updater.go | 15 +- .../algebra/integral/pool_tracker.go | 15 +- .../algebra/v1/pool_list_updater.go | 15 +- .../algebra/v1/pool_tracker.go | 15 +- .../ambient/pool_list_updater.go | 15 +- .../ambient/pool_list_updater_test.go | 6 +- .../balancer-v1/pools_list_updater.go | 14 +- .../composable-stable/pools_list_updater.go | 10 +- .../balancer-v2/shared/pools_list_updater.go | 25 +- .../balancer-v2/stable/pools_list_updater.go | 10 +- .../weighted/pools_list_updater.go | 10 +- .../dodo/classical/pools_list_updater.go | 9 +- .../dodo/dpp/pools_list_updater.go | 9 +- .../dodo/dsp/pools_list_updater.go | 9 +- .../dodo/dvm/pools_list_updater.go | 9 +- .../dodo/shared/pools_list_updater.go | 21 +- .../gyroscope/2clp/pools_list_updater.go | 11 +- .../gyroscope/3clp/pools_list_updater.go | 11 +- .../gyroscope/eclp/pools_list_updater.go | 11 +- .../gyroscope/shared/pools_list_updater.go | 15 +- pkg/source/algebrav1/pool_list_updater.go | 15 +- pkg/source/algebrav1/pool_tracker.go | 15 +- .../pools_list_updater.go | 15 +- pkg/source/balancer/pools_list_updater.go | 15 +- pkg/source/dodo/pools_list_updater.go | 15 +- pkg/source/elastic/pool_tracker.go | 15 +- pkg/source/elastic/pools_list_updater.go | 15 +- pkg/source/liquiditybookv20/pool_tracker.go | 15 +- pkg/source/liquiditybookv21/pool_tracker.go | 30 +- pkg/source/maverickv1/pool_list_updater.go | 14 +- pkg/source/nuriv2/pool_tracker.go | 15 +- pkg/source/nuriv2/pools_list_updater.go | 15 +- pkg/source/pancakev3/pool_tracker.go | 15 +- pkg/source/pancakev3/pools_list_updater.go | 15 +- pkg/source/platypus/pools_list_updater.go | 15 +- pkg/source/ramsesv2/pool_tracker.go | 15 +- pkg/source/ramsesv2/pools_list_updater.go | 15 +- pkg/source/slipstream/pool_tracker.go | 15 +- pkg/source/slipstream/pools_list_updater.go | 15 +- pkg/source/solidly-v3/pool_tracker.go | 15 +- pkg/source/solidly-v3/pools_list_updater.go | 15 +- pkg/source/uniswapv3/pool_tracker.go | 15 +- pkg/source/uniswapv3/pools_list_updater.go | 15 +- pkg/source/wombat/pool_tracker.go | 11 +- pkg/source/wombat/pools_list_updater.go | 15 +- pkg/util/graphql/graphql.go | 346 ++++++++++++++++++ pkg/util/graphql/interceptor.go | 26 ++ pkg/util/graphql/options.go | 6 - pkg/util/graphql/timeout.go | 40 +- pkg/util/http/interceptor.go | 30 ++ 50 files changed, 793 insertions(+), 295 deletions(-) create mode 100644 pkg/util/graphql/graphql.go create mode 100644 pkg/util/graphql/interceptor.go delete mode 100644 pkg/util/graphql/options.go create mode 100644 pkg/util/http/interceptor.go diff --git a/pkg/liquidity-source/algebra/integral/pool_list_updater.go b/pkg/liquidity-source/algebra/integral/pool_list_updater.go index d16a1f205..a2fceb6fe 100644 --- a/pkg/liquidity-source/algebra/integral/pool_list_updater.go +++ b/pkg/liquidity-source/algebra/integral/pool_list_updater.go @@ -3,7 +3,8 @@ package integral import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" @@ -16,14 +17,14 @@ import ( type PoolsListUpdater struct { config *Config - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -35,13 +36,13 @@ func NewPoolsListUpdater( func (d *PoolsListUpdater) getPoolsList(ctx context.Context, lastCreatedAtTimestamp *big.Int, lastPoolIds []string, first, skip int) ([]SubgraphPool, error) { allowSubgraphError := d.config.AllowSubgraphError - req := mutableclient.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, lastPoolIds, first, skip)) + req := graphqlpkg.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, lastPoolIds, first, skip)) var response struct { Pools []SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { if allowSubgraphError && len(response.Pools) > 0 { return response.Pools, nil } diff --git a/pkg/liquidity-source/algebra/integral/pool_tracker.go b/pkg/liquidity-source/algebra/integral/pool_tracker.go index 1a5217b77..79ce944df 100644 --- a/pkg/liquidity-source/algebra/integral/pool_tracker.go +++ b/pkg/liquidity-source/algebra/integral/pool_tracker.go @@ -2,7 +2,8 @@ package integral import ( "context" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "time" @@ -23,15 +24,15 @@ import ( type PoolTracker struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolTracker( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) (*PoolTracker, error) { return &PoolTracker{ config: cfg, @@ -568,14 +569,14 @@ func (d *PoolTracker) getPoolTicks(ctx context.Context, poolAddress string) ([]T var ticks []TickResp for { - req := mutableclient.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, skip)) + req := graphqlpkg.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, skip)) var resp struct { Pool *SubgraphPoolTicks `json:"pool"` Meta *valueobject.SubgraphMeta `json:"_meta"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &resp); err != nil { + if err := d.graphqlClient.Run(ctx, req, &resp); err != nil { if allowSubgraphError { if resp.Pool == nil { l.WithFields(logger.Fields{ diff --git a/pkg/liquidity-source/algebra/v1/pool_list_updater.go b/pkg/liquidity-source/algebra/v1/pool_list_updater.go index fd2207a12..dca4975f6 100644 --- a/pkg/liquidity-source/algebra/v1/pool_list_updater.go +++ b/pkg/liquidity-source/algebra/v1/pool_list_updater.go @@ -3,7 +3,8 @@ package algebrav1 import ( "context" "fmt" - mutableclient "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" @@ -15,14 +16,14 @@ import ( type PoolsListUpdater struct { config *Config - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -34,13 +35,13 @@ func NewPoolsListUpdater( func (d *PoolsListUpdater) getPoolsList(ctx context.Context, lastCreatedAtTimestamp *big.Int, lastPoolIds []string, first, skip int) ([]SubgraphPool, error) { allowSubgraphError := d.config.AllowSubgraphError - req := mutableclient.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, lastPoolIds, first, skip)) + req := graphqlpkg.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, lastPoolIds, first, skip)) var response struct { Pools []SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { if allowSubgraphError && len(response.Pools) > 0 { return response.Pools, nil } diff --git a/pkg/liquidity-source/algebra/v1/pool_tracker.go b/pkg/liquidity-source/algebra/v1/pool_tracker.go index 9c8b1a309..c1cddaec1 100644 --- a/pkg/liquidity-source/algebra/v1/pool_tracker.go +++ b/pkg/liquidity-source/algebra/v1/pool_tracker.go @@ -2,7 +2,8 @@ package algebrav1 import ( "context" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "time" @@ -23,15 +24,15 @@ import ( type PoolTracker struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolTracker( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) (*PoolTracker, error) { return &PoolTracker{ config: cfg, @@ -619,14 +620,14 @@ func (d *PoolTracker) getPoolTicks(ctx context.Context, poolAddress string) ([]T var ticks []TickResp for { - req := mutableclient.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, skip)) + req := graphqlpkg.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, skip)) var resp struct { Pool *SubgraphPoolTicks `json:"pool"` Meta *valueobject.SubgraphMeta `json:"_meta"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &resp); err != nil { + if err := d.graphqlClient.Run(ctx, req, &resp); err != nil { if allowSubgraphError { if resp.Pool == nil { l.WithFields(logger.Fields{ diff --git a/pkg/liquidity-source/ambient/pool_list_updater.go b/pkg/liquidity-source/ambient/pool_list_updater.go index f8eb64d33..beee261c4 100644 --- a/pkg/liquidity-source/ambient/pool_list_updater.go +++ b/pkg/liquidity-source/ambient/pool_list_updater.go @@ -3,7 +3,8 @@ package ambient import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "strings" "time" @@ -18,15 +19,15 @@ import ( type PoolListUpdater struct { cfg Config poolDatastore IPoolDatastore - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg Config, poolDatastore IPoolDatastore, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) (*PoolListUpdater, error) { if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("invalid config: %w", err) @@ -116,7 +117,7 @@ func (u *PoolListUpdater) fetchSubgraph(ctx context.Context, lastCreateTime uint limit = u.cfg.SubgraphLimit } var ( - req = mutableclient.NewRequest(fmt.Sprintf(`{ + req = graphqlpkg.NewRequest(fmt.Sprintf(`{ pools( where: { timeCreate_gt: %d, @@ -136,7 +137,7 @@ func (u *PoolListUpdater) fetchSubgraph(ctx context.Context, lastCreateTime uint resp SubgraphPoolsResponse ) - if err := u.graphqlClient.Run(ctx, u.graphqlClientCfg, req, &resp); err != nil { + if err := u.graphqlClient.Run(ctx, req, &resp); err != nil { return nil, err } diff --git a/pkg/liquidity-source/ambient/pool_list_updater_test.go b/pkg/liquidity-source/ambient/pool_list_updater_test.go index 37689c94c..2674368ae 100644 --- a/pkg/liquidity-source/ambient/pool_list_updater_test.go +++ b/pkg/liquidity-source/ambient/pool_list_updater_test.go @@ -3,7 +3,7 @@ package ambient_test import ( "context" "fmt" - mutableclient "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" "math/big" "os" "testing" @@ -52,11 +52,11 @@ func TestPoolListUpdater(t *testing.T) { MulticallContractAddress: multicallAddress, } - graphqlClientCfg = &mutableclient.Config{ + graphqlClientCfg = &graphqlpkg.Config{ Url: config.SubgraphAPI, Timeout: config.SubgraphRequestTimeout.Duration, } - graphqlClient = mutableclient.New(*graphqlClientCfg) + graphqlClient = graphqlpkg.New(*graphqlClientCfg) ) { diff --git a/pkg/liquidity-source/balancer-v1/pools_list_updater.go b/pkg/liquidity-source/balancer-v1/pools_list_updater.go index ab804bfba..aa3a7601b 100644 --- a/pkg/liquidity-source/balancer-v1/pools_list_updater.go +++ b/pkg/liquidity-source/balancer-v1/pools_list_updater.go @@ -3,7 +3,7 @@ package balancerv1 import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" "time" "github.com/KyberNetwork/ethrpc" @@ -17,8 +17,8 @@ type ( PoolsListUpdater struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } PoolsListUpdaterMetadata struct { @@ -39,8 +39,8 @@ type ( func NewPoolsListUpdater( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -145,11 +145,11 @@ func (u *PoolsListUpdater) initPools(_ context.Context, subgraphPools []FetchPoo func (u *PoolsListUpdater) fetchPoolsFromSubgraph(ctx context.Context, lastCreateTime int) ([]FetchPoolsResponsePool, error) { var ( - req = mutableclient.NewRequest(newFetchPoolIDsQuery(lastCreateTime, u.config.NewPoolLimit)) + req = graphqlpkg.NewRequest(newFetchPoolIDsQuery(lastCreateTime, u.config.NewPoolLimit)) resp FetchPoolsResponse ) - if err := u.graphqlClient.Run(ctx, u.graphqlClientCfg, req, &resp); err != nil { + if err := u.graphqlClient.Run(ctx, req, &resp); err != nil { return nil, err } diff --git a/pkg/liquidity-source/balancer-v2/composable-stable/pools_list_updater.go b/pkg/liquidity-source/balancer-v2/composable-stable/pools_list_updater.go index 95d5fad21..849805009 100644 --- a/pkg/liquidity-source/balancer-v2/composable-stable/pools_list_updater.go +++ b/pkg/liquidity-source/balancer-v2/composable-stable/pools_list_updater.go @@ -2,6 +2,8 @@ package composablestable import ( "context" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strings" "time" @@ -23,14 +25,18 @@ type PoolsListUpdater struct { sharedUpdater *shared.PoolsListUpdater } -func NewPoolsListUpdater(config *Config, ethrpcClient *ethrpc.Client) *PoolsListUpdater { +func NewPoolsListUpdater(config *Config, + ethrpcClient *ethrpc.Client, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, +) *PoolsListUpdater { sharedUpdater := shared.NewPoolsListUpdater(&shared.Config{ DexID: config.DexID, SubgraphAPI: config.SubgraphAPI, SubgraphHeaders: config.SubgraphHeaders, NewPoolLimit: config.NewPoolLimit, PoolTypes: []string{poolTypeComposableStable}, - }) + }, graphqlClient, graphqlClientCfg) return &PoolsListUpdater{ config: config, diff --git a/pkg/liquidity-source/balancer-v2/shared/pools_list_updater.go b/pkg/liquidity-source/balancer-v2/shared/pools_list_updater.go index c75de9126..68a1d7d4e 100644 --- a/pkg/liquidity-source/balancer-v2/shared/pools_list_updater.go +++ b/pkg/liquidity-source/balancer-v2/shared/pools_list_updater.go @@ -7,15 +7,15 @@ import ( "time" "github.com/goccy/go-json" - "github.com/machinebox/graphql" graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" ) type ( PoolsListUpdater struct { - config *Config - graphqlClient *graphql.Client + config *Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } Config struct { @@ -33,16 +33,15 @@ type ( const graphQLRequestTimeout = 20 * time.Second -func NewPoolsListUpdater(config *Config) *PoolsListUpdater { - graphqlClient := graphqlpkg.New(graphqlpkg.Config{ - Url: config.SubgraphAPI, - Header: config.SubgraphHeaders, - Timeout: graphQLRequestTimeout, - }) - +func NewPoolsListUpdater( + config *Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, +) *PoolsListUpdater { return &PoolsListUpdater{ - config: config, - graphqlClient: graphqlClient, + config: config, + graphqlClient: graphqlClient, + graphqlClientCfg: graphqlClientCfg, } } @@ -87,7 +86,7 @@ func (u *PoolsListUpdater) querySubgraph(ctx context.Context, lastCreateTime *bi u.config.NewPoolLimit, 0, ) - req := graphql.NewRequest(query) + req := graphqlpkg.NewRequest(query) if err := u.graphqlClient.Run(ctx, req, &response); err != nil { return nil, nil, err diff --git a/pkg/liquidity-source/balancer-v2/stable/pools_list_updater.go b/pkg/liquidity-source/balancer-v2/stable/pools_list_updater.go index 87d66d24e..24c714365 100644 --- a/pkg/liquidity-source/balancer-v2/stable/pools_list_updater.go +++ b/pkg/liquidity-source/balancer-v2/stable/pools_list_updater.go @@ -2,6 +2,7 @@ package stable import ( "context" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" "math/big" "strings" "time" @@ -23,14 +24,19 @@ type PoolsListUpdater struct { sharedUpdater *shared.PoolsListUpdater } -func NewPoolsListUpdater(config *Config, ethrpcClient *ethrpc.Client) *PoolsListUpdater { +func NewPoolsListUpdater( + config *Config, + ethrpcClient *ethrpc.Client, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, +) *PoolsListUpdater { sharedUpdater := shared.NewPoolsListUpdater(&shared.Config{ DexID: config.DexID, SubgraphAPI: config.SubgraphAPI, SubgraphHeaders: config.SubgraphHeaders, NewPoolLimit: config.NewPoolLimit, PoolTypes: []string{poolTypeStable, poolTypeMetaStable}, - }) + }, graphqlClient, graphqlClientCfg) return &PoolsListUpdater{ config: config, diff --git a/pkg/liquidity-source/balancer-v2/weighted/pools_list_updater.go b/pkg/liquidity-source/balancer-v2/weighted/pools_list_updater.go index 368d28188..b613aa6c1 100644 --- a/pkg/liquidity-source/balancer-v2/weighted/pools_list_updater.go +++ b/pkg/liquidity-source/balancer-v2/weighted/pools_list_updater.go @@ -3,6 +3,7 @@ package weighted import ( "context" "errors" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" "math/big" "strings" "time" @@ -27,14 +28,19 @@ type PoolsListUpdater struct { sharedUpdater *shared.PoolsListUpdater } -func NewPoolsListUpdater(config *Config, ethrpcClient *ethrpc.Client) *PoolsListUpdater { +func NewPoolsListUpdater( + config *Config, + ethrpcClient *ethrpc.Client, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, +) *PoolsListUpdater { sharedUpdater := shared.NewPoolsListUpdater(&shared.Config{ DexID: config.DexID, SubgraphAPI: config.SubgraphAPI, SubgraphHeaders: config.SubgraphHeaders, NewPoolLimit: config.NewPoolLimit, PoolTypes: []string{poolTypeWeighted}, - }) + }, graphqlClient, graphqlClientCfg) return &PoolsListUpdater{ config: *config, diff --git a/pkg/liquidity-source/dodo/classical/pools_list_updater.go b/pkg/liquidity-source/dodo/classical/pools_list_updater.go index ef9883e87..d69759eda 100644 --- a/pkg/liquidity-source/dodo/classical/pools_list_updater.go +++ b/pkg/liquidity-source/dodo/classical/pools_list_updater.go @@ -2,6 +2,7 @@ package classical import ( "context" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" "github.com/KyberNetwork/blockchain-toolkit/integer" "github.com/KyberNetwork/logger" @@ -16,8 +17,12 @@ type PoolsListUpdater struct { sharedUpdater *shared.PoolsListUpdater } -func NewPoolsListUpdater(config *shared.Config) *PoolsListUpdater { - sharedUpdater := shared.NewPoolsListUpdater(config) +func NewPoolsListUpdater( + config *shared.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, +) *PoolsListUpdater { + sharedUpdater := shared.NewPoolsListUpdater(config, graphqlClient, graphqlClientCfg) return &PoolsListUpdater{ config: *config, diff --git a/pkg/liquidity-source/dodo/dpp/pools_list_updater.go b/pkg/liquidity-source/dodo/dpp/pools_list_updater.go index 4606d3d4d..9851cb925 100644 --- a/pkg/liquidity-source/dodo/dpp/pools_list_updater.go +++ b/pkg/liquidity-source/dodo/dpp/pools_list_updater.go @@ -2,6 +2,7 @@ package dpp import ( "context" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" "github.com/KyberNetwork/blockchain-toolkit/integer" "github.com/KyberNetwork/logger" @@ -16,8 +17,12 @@ type PoolsListUpdater struct { sharedUpdater *shared.PoolsListUpdater } -func NewPoolsListUpdater(config *shared.Config) *PoolsListUpdater { - sharedUpdater := shared.NewPoolsListUpdater(config) +func NewPoolsListUpdater( + config *shared.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, +) *PoolsListUpdater { + sharedUpdater := shared.NewPoolsListUpdater(config, graphqlClient, graphqlClientCfg) return &PoolsListUpdater{ config: *config, diff --git a/pkg/liquidity-source/dodo/dsp/pools_list_updater.go b/pkg/liquidity-source/dodo/dsp/pools_list_updater.go index d65df3d96..674ca8c20 100644 --- a/pkg/liquidity-source/dodo/dsp/pools_list_updater.go +++ b/pkg/liquidity-source/dodo/dsp/pools_list_updater.go @@ -2,6 +2,7 @@ package dsp import ( "context" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" "github.com/KyberNetwork/blockchain-toolkit/integer" "github.com/KyberNetwork/logger" @@ -16,8 +17,12 @@ type PoolsListUpdater struct { sharedUpdater *shared.PoolsListUpdater } -func NewPoolsListUpdater(config *shared.Config) *PoolsListUpdater { - sharedUpdater := shared.NewPoolsListUpdater(config) +func NewPoolsListUpdater( + config *shared.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, +) *PoolsListUpdater { + sharedUpdater := shared.NewPoolsListUpdater(config, graphqlClient, graphqlClientCfg) return &PoolsListUpdater{ config: *config, diff --git a/pkg/liquidity-source/dodo/dvm/pools_list_updater.go b/pkg/liquidity-source/dodo/dvm/pools_list_updater.go index 823d335d7..889d07265 100644 --- a/pkg/liquidity-source/dodo/dvm/pools_list_updater.go +++ b/pkg/liquidity-source/dodo/dvm/pools_list_updater.go @@ -2,6 +2,7 @@ package dvm import ( "context" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" "github.com/KyberNetwork/blockchain-toolkit/integer" "github.com/KyberNetwork/logger" @@ -16,8 +17,12 @@ type PoolsListUpdater struct { sharedUpdater *shared.PoolsListUpdater } -func NewPoolsListUpdater(config *shared.Config) *PoolsListUpdater { - sharedUpdater := shared.NewPoolsListUpdater(config) +func NewPoolsListUpdater( + config *shared.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, +) *PoolsListUpdater { + sharedUpdater := shared.NewPoolsListUpdater(config, graphqlClient, graphqlClientCfg) return &PoolsListUpdater{ config: *config, diff --git a/pkg/liquidity-source/dodo/shared/pools_list_updater.go b/pkg/liquidity-source/dodo/shared/pools_list_updater.go index a3cbce405..5303dd1db 100644 --- a/pkg/liquidity-source/dodo/shared/pools_list_updater.go +++ b/pkg/liquidity-source/dodo/shared/pools_list_updater.go @@ -8,29 +8,26 @@ import ( "github.com/KyberNetwork/logger" "github.com/goccy/go-json" - "github.com/machinebox/graphql" "github.com/KyberNetwork/kyberswap-dex-lib/pkg/entity" graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" ) type PoolsListUpdater struct { - config *Config - graphqlClient *graphql.Client + config *Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { - graphqlClient := graphqlpkg.New(graphqlpkg.Config{ - Url: cfg.SubgraphAPI, - Header: cfg.SubgraphHeaders, - Timeout: defaultGraphQLRequestTimeout, - }) - return &PoolsListUpdater{ - config: cfg, - graphqlClient: graphqlClient, + config: cfg, + graphqlClient: graphqlClient, + graphqlClientCfg: graphqlClientCfg, } } @@ -169,7 +166,7 @@ func (d *PoolsListUpdater) getPoolsList( lastCreateTime *big.Int, ) ([]SubgraphPool, error) { // 'CLASSICAL', 'DVM', 'DSP', 'DPP' pools - req := graphql.NewRequest(fmt.Sprintf(`{ + req := graphqlpkg.NewRequest(fmt.Sprintf(`{ pairs( first: %v, skip: %v, diff --git a/pkg/liquidity-source/gyroscope/2clp/pools_list_updater.go b/pkg/liquidity-source/gyroscope/2clp/pools_list_updater.go index 468305a64..54deef1a9 100644 --- a/pkg/liquidity-source/gyroscope/2clp/pools_list_updater.go +++ b/pkg/liquidity-source/gyroscope/2clp/pools_list_updater.go @@ -2,7 +2,8 @@ package gyro2clp import ( "context" - mutableclient "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strings" "time" @@ -22,15 +23,15 @@ type PoolsListUpdater struct { config Config ethrpcClient *ethrpc.Client sharedUpdater *shared.PoolsListUpdater - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( config *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { sharedUpdater := shared.NewPoolsListUpdater(&shared.Config{ DexID: config.DexID, diff --git a/pkg/liquidity-source/gyroscope/3clp/pools_list_updater.go b/pkg/liquidity-source/gyroscope/3clp/pools_list_updater.go index 6d166dc97..1802cafe4 100644 --- a/pkg/liquidity-source/gyroscope/3clp/pools_list_updater.go +++ b/pkg/liquidity-source/gyroscope/3clp/pools_list_updater.go @@ -2,7 +2,8 @@ package gyro3clp import ( "context" - mutableclient "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strings" "time" @@ -22,15 +23,15 @@ type PoolsListUpdater struct { config Config ethrpcClient *ethrpc.Client sharedUpdater *shared.PoolsListUpdater - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( config *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { sharedUpdater := shared.NewPoolsListUpdater(&shared.Config{ DexID: config.DexID, diff --git a/pkg/liquidity-source/gyroscope/eclp/pools_list_updater.go b/pkg/liquidity-source/gyroscope/eclp/pools_list_updater.go index 145581a31..9dd42dd3f 100644 --- a/pkg/liquidity-source/gyroscope/eclp/pools_list_updater.go +++ b/pkg/liquidity-source/gyroscope/eclp/pools_list_updater.go @@ -2,7 +2,8 @@ package gyroeclp import ( "context" - mutableclient "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "strings" "time" @@ -19,15 +20,15 @@ type PoolsListUpdater struct { config Config ethrpcClient *ethrpc.Client sharedUpdater *shared.PoolsListUpdater - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( config *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { sharedUpdater := shared.NewPoolsListUpdater(&shared.Config{ DexID: config.DexID, diff --git a/pkg/liquidity-source/gyroscope/shared/pools_list_updater.go b/pkg/liquidity-source/gyroscope/shared/pools_list_updater.go index d725ca700..31865e8f1 100644 --- a/pkg/liquidity-source/gyroscope/shared/pools_list_updater.go +++ b/pkg/liquidity-source/gyroscope/shared/pools_list_updater.go @@ -2,7 +2,8 @@ package shared import ( "context" - mutableclient "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "net/http" "time" @@ -13,8 +14,8 @@ import ( type ( PoolsListUpdater struct { config *Config - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } Config struct { @@ -34,8 +35,8 @@ const graphQLRequestTimeout = 20 * time.Second func NewPoolsListUpdater( config *Config, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: config, @@ -85,9 +86,9 @@ func (u *PoolsListUpdater) querySubgraph(ctx context.Context, lastCreateTime *bi u.config.NewPoolLimit, 0, ) - req := mutableclient.NewRequest(query) + req := graphqlpkg.NewRequest(query) - if err := u.graphqlClient.Run(ctx, u.graphqlClientCfg, req, &response); err != nil { + if err := u.graphqlClient.Run(ctx, req, &response); err != nil { return nil, nil, err } diff --git a/pkg/source/algebrav1/pool_list_updater.go b/pkg/source/algebrav1/pool_list_updater.go index 24b01c175..989725957 100644 --- a/pkg/source/algebrav1/pool_list_updater.go +++ b/pkg/source/algebrav1/pool_list_updater.go @@ -3,7 +3,8 @@ package algebrav1 import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" "time" @@ -15,14 +16,14 @@ import ( type PoolsListUpdater struct { config *Config - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -34,13 +35,13 @@ func NewPoolsListUpdater( func (d *PoolsListUpdater) getPoolsList(ctx context.Context, lastCreatedAtTimestamp *big.Int, lastPoolIds []string, first, skip int) ([]SubgraphPool, error) { allowSubgraphError := d.config.AllowSubgraphError - req := mutableclient.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, lastPoolIds, first, skip)) + req := graphqlpkg.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, lastPoolIds, first, skip)) var response struct { Pools []SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { if allowSubgraphError && len(response.Pools) > 0 { return response.Pools, nil } diff --git a/pkg/source/algebrav1/pool_tracker.go b/pkg/source/algebrav1/pool_tracker.go index 9c8b1a309..c1cddaec1 100644 --- a/pkg/source/algebrav1/pool_tracker.go +++ b/pkg/source/algebrav1/pool_tracker.go @@ -2,7 +2,8 @@ package algebrav1 import ( "context" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "time" @@ -23,15 +24,15 @@ import ( type PoolTracker struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolTracker( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) (*PoolTracker, error) { return &PoolTracker{ config: cfg, @@ -619,14 +620,14 @@ func (d *PoolTracker) getPoolTicks(ctx context.Context, poolAddress string) ([]T var ticks []TickResp for { - req := mutableclient.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, skip)) + req := graphqlpkg.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, skip)) var resp struct { Pool *SubgraphPoolTicks `json:"pool"` Meta *valueobject.SubgraphMeta `json:"_meta"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &resp); err != nil { + if err := d.graphqlClient.Run(ctx, req, &resp); err != nil { if allowSubgraphError { if resp.Pool == nil { l.WithFields(logger.Fields{ diff --git a/pkg/source/balancer-composable-stable/pools_list_updater.go b/pkg/source/balancer-composable-stable/pools_list_updater.go index ab42a56cf..92e872b8c 100644 --- a/pkg/source/balancer-composable-stable/pools_list_updater.go +++ b/pkg/source/balancer-composable-stable/pools_list_updater.go @@ -3,7 +3,8 @@ package balancercomposablestable import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" "strings" @@ -20,15 +21,15 @@ import ( type PoolsListUpdater struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -198,7 +199,7 @@ func (d *PoolsListUpdater) getPoolsListByType( lastCreateTime = zeroBI } - req := mutableclient.NewRequest(fmt.Sprintf(`{ + req := graphqlpkg.NewRequest(fmt.Sprintf(`{ pools( where : { poolType: "%v", @@ -229,7 +230,7 @@ func (d *PoolsListUpdater) getPoolsListByType( Pairs []*SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { logger.WithFields(logger.Fields{ "type": poolType, "error": err, diff --git a/pkg/source/balancer/pools_list_updater.go b/pkg/source/balancer/pools_list_updater.go index 315d25357..73ffba4c9 100644 --- a/pkg/source/balancer/pools_list_updater.go +++ b/pkg/source/balancer/pools_list_updater.go @@ -3,7 +3,8 @@ package balancer import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" "strings" @@ -20,15 +21,15 @@ import ( type PoolsListUpdater struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -200,7 +201,7 @@ func (d *PoolsListUpdater) getPoolsListByType( lastCreateTime = zeroBI } - req := mutableclient.NewRequest(fmt.Sprintf(`{ + req := graphqlpkg.NewRequest(fmt.Sprintf(`{ pools( where : { poolType: "%v", @@ -231,7 +232,7 @@ func (d *PoolsListUpdater) getPoolsListByType( Pairs []*SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { logger.WithFields(logger.Fields{ "type": poolType, "error": err, diff --git a/pkg/source/dodo/pools_list_updater.go b/pkg/source/dodo/pools_list_updater.go index 992756903..2c80000d5 100644 --- a/pkg/source/dodo/pools_list_updater.go +++ b/pkg/source/dodo/pools_list_updater.go @@ -4,7 +4,8 @@ import ( "context" "errors" "fmt" - mutableclient "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" @@ -15,14 +16,14 @@ import ( type PoolsListUpdater struct { config *Config - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -212,7 +213,7 @@ func (d *PoolsListUpdater) getPoolsList( lastCreateTime *big.Int, ) ([]SubgraphPool, error) { // 'CLASSICAL', 'DVM', 'DSP', 'DPP' pools - req := mutableclient.NewRequest(fmt.Sprintf(`{ + req := graphqlpkg.NewRequest(fmt.Sprintf(`{ pairs( first: %v, skip: %v, @@ -257,7 +258,7 @@ func (d *PoolsListUpdater) getPoolsList( var response struct { Pairs []SubgraphPool `json:"pairs"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { logger.Errorf("failed to query subgraph, err %v", err) return nil, err } diff --git a/pkg/source/elastic/pool_tracker.go b/pkg/source/elastic/pool_tracker.go index 9946c506d..bdd1b68c4 100644 --- a/pkg/source/elastic/pool_tracker.go +++ b/pkg/source/elastic/pool_tracker.go @@ -3,7 +3,8 @@ package elastic import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "time" "github.com/KyberNetwork/ethrpc" @@ -20,15 +21,15 @@ import ( type PoolTracker struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolTracker( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) (*PoolTracker, error) { return &PoolTracker{ config: cfg, @@ -171,7 +172,7 @@ func (d *PoolTracker) getPoolTicks(ctx context.Context, poolAddress string) ([]T var ticks []TickResp for { - req := mutableclient.NewRequest( + req := graphqlpkg.NewRequest( fmt.Sprintf(`{ pool(id: "%v") { id @@ -190,7 +191,7 @@ func (d *PoolTracker) getPoolTicks(ctx context.Context, poolAddress string) ([]T Meta *valueobject.SubgraphMeta `json:"_meta"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &resp); err != nil { + if err := d.graphqlClient.Run(ctx, req, &resp); err != nil { logger.Errorf("failed to query subgraph for pool: %v, err: %v", poolAddress, err) return nil, err } diff --git a/pkg/source/elastic/pools_list_updater.go b/pkg/source/elastic/pools_list_updater.go index 1b062449c..508d903bb 100644 --- a/pkg/source/elastic/pools_list_updater.go +++ b/pkg/source/elastic/pools_list_updater.go @@ -3,7 +3,8 @@ package elastic import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" "time" @@ -16,14 +17,14 @@ import ( type PoolsListUpdater struct { config *Config - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -33,7 +34,7 @@ func NewPoolsListUpdater( } func (d *PoolsListUpdater) getPoolsList(ctx context.Context, lastCreatedAtTimestamp *big.Int, first, skip int) ([]SubgraphPool, error) { - req := mutableclient.NewRequest(fmt.Sprintf(`{ + req := graphqlpkg.NewRequest(fmt.Sprintf(`{ pools(where : {createdAtTimestamp_gte: %v}, first: %v, skip: %v, orderBy: createdAtTimestamp, orderDirection: asc) { id liquidity @@ -61,7 +62,7 @@ func (d *PoolsListUpdater) getPoolsList(ctx context.Context, lastCreatedAtTimest Pools []SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { logger.Errorf("failed to query subgraph, err: %v", err) return nil, err } diff --git a/pkg/source/liquiditybookv20/pool_tracker.go b/pkg/source/liquiditybookv20/pool_tracker.go index 250af8533..09d820a80 100644 --- a/pkg/source/liquiditybookv20/pool_tracker.go +++ b/pkg/source/liquiditybookv20/pool_tracker.go @@ -2,7 +2,8 @@ package liquiditybookv20 import ( "context" - mutableclient "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "sort" "strconv" @@ -24,15 +25,15 @@ import ( type PoolTracker struct { cfg *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolTracker( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) (*PoolTracker, error) { return &PoolTracker{ cfg: cfg, @@ -205,7 +206,7 @@ func (d *PoolTracker) querySubgraph(ctx context.Context, p entity.Pool) (*queryS // query var ( query = buildQueryGetBins(p.Address, binIDGT) - req = mutableclient.NewRequest(query) + req = graphqlpkg.NewRequest(query) resp struct { Pair *lbpairSubgraphResp `json:"lbpair"` @@ -213,7 +214,7 @@ func (d *PoolTracker) querySubgraph(ctx context.Context, p entity.Pool) (*queryS } ) - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &resp); err != nil { + if err := d.graphqlClient.Run(ctx, req, &resp); err != nil { if !d.cfg.AllowSubgraphError { logger.WithFields(logger.Fields{ "poolAddress": p.Address, diff --git a/pkg/source/liquiditybookv21/pool_tracker.go b/pkg/source/liquiditybookv21/pool_tracker.go index 6005ec1c5..dfc308510 100644 --- a/pkg/source/liquiditybookv21/pool_tracker.go +++ b/pkg/source/liquiditybookv21/pool_tracker.go @@ -2,7 +2,6 @@ package liquiditybookv21 import ( "context" - "github.com/machinebox/graphql" "math/big" "sort" "strconv" @@ -22,22 +21,23 @@ import ( ) type PoolTracker struct { - cfg *Config - ethrpcClient *ethrpc.Client - graphqlClient *graphql.Client + cfg *Config + ethrpcClient *ethrpc.Client + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } -func NewPoolTracker(cfg *Config, ethrpcClient *ethrpc.Client) (*PoolTracker, error) { - graphqlClient := graphqlpkg.New(graphqlpkg.Config{ - Url: cfg.SubgraphAPI, - Header: cfg.SubgraphHeaders, - Timeout: graphQLRequestTimeout, - }) - +func NewPoolTracker( + cfg *Config, + ethrpcClient *ethrpc.Client, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, +) (*PoolTracker, error) { return &PoolTracker{ - cfg: cfg, - ethrpcClient: ethrpcClient, - graphqlClient: graphqlClient, + cfg: cfg, + ethrpcClient: ethrpcClient, + graphqlClient: graphqlClient, + graphqlClientCfg: graphqlClientCfg, }, nil } @@ -234,7 +234,7 @@ func (d *PoolTracker) querySubgraph(ctx context.Context, p entity.Pool) (*queryS // query var ( query = buildQueryGetBins(p.Address, binIDGT) - req = graphql.NewRequest(query) + req = graphqlpkg.NewRequest(query) resp struct { Pair *lbpairSubgraphResp `json:"lbpair"` diff --git a/pkg/source/maverickv1/pool_list_updater.go b/pkg/source/maverickv1/pool_list_updater.go index 3bb01b083..b0b4842d4 100644 --- a/pkg/source/maverickv1/pool_list_updater.go +++ b/pkg/source/maverickv1/pool_list_updater.go @@ -3,7 +3,6 @@ package maverickv1 import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" "strconv" "time" @@ -11,6 +10,7 @@ import ( "github.com/KyberNetwork/kyberswap-dex-lib/pkg/entity" "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util" "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/bignumber" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" "github.com/KyberNetwork/logger" "github.com/goccy/go-json" ) @@ -18,15 +18,15 @@ import ( type PoolListUpdater struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolListUpdater( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolListUpdater { return &PoolListUpdater{ config: cfg, @@ -154,7 +154,7 @@ func (d *PoolListUpdater) querySubgraph( first int, skip int, ) ([]*SubgraphPool, error) { - req := mutableclient.NewRequest(fmt.Sprintf(`{ + req := graphqlpkg.NewRequest(fmt.Sprintf(`{ pools( where : { timestamp_gte: %v, @@ -184,7 +184,7 @@ func (d *PoolListUpdater) querySubgraph( var response struct { Pools []*SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { logger.WithFields(logger.Fields{ "type": DexTypeMaverickV1, "error": err, diff --git a/pkg/source/nuriv2/pool_tracker.go b/pkg/source/nuriv2/pool_tracker.go index 727fb3af3..a2c589719 100644 --- a/pkg/source/nuriv2/pool_tracker.go +++ b/pkg/source/nuriv2/pool_tracker.go @@ -2,7 +2,8 @@ package nuriv2 import ( "context" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "time" @@ -20,15 +21,15 @@ import ( type PoolTracker struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolTracker( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) (*PoolTracker, error) { return &PoolTracker{ config: cfg, @@ -228,14 +229,14 @@ func (d *PoolTracker) getPoolTicks(ctx context.Context, poolAddress string) ([]T var ticks []TickResp for { - req := mutableclient.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, lastTickIdx)) + req := graphqlpkg.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, lastTickIdx)) var resp struct { Ticks []TickResp `json:"ticks"` Meta *valueobject.SubgraphMeta `json:"_meta"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &resp); err != nil { + if err := d.graphqlClient.Run(ctx, req, &resp); err != nil { // Workaround at the moment to live with the error subgraph on Arbitrum if allowSubgraphError { if resp.Ticks == nil { diff --git a/pkg/source/nuriv2/pools_list_updater.go b/pkg/source/nuriv2/pools_list_updater.go index fd7c21309..692f18169 100644 --- a/pkg/source/nuriv2/pools_list_updater.go +++ b/pkg/source/nuriv2/pools_list_updater.go @@ -3,7 +3,8 @@ package nuriv2 import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" @@ -21,15 +22,15 @@ import ( type PoolsListUpdater struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -42,13 +43,13 @@ func NewPoolsListUpdater( func (d *PoolsListUpdater) getPoolsList(ctx context.Context, lastCreatedAtTimestamp *big.Int, first, skip int) ([]SubgraphPool, error) { allowSubgraphError := d.config.IsAllowSubgraphError() - req := mutableclient.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, first, skip)) + req := graphqlpkg.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, first, skip)) var response struct { Pools []SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { logger.WithFields(logger.Fields{ "error": err, }).Errorf("failed to query subgraph") diff --git a/pkg/source/pancakev3/pool_tracker.go b/pkg/source/pancakev3/pool_tracker.go index 31d83f2b1..24c8ea637 100644 --- a/pkg/source/pancakev3/pool_tracker.go +++ b/pkg/source/pancakev3/pool_tracker.go @@ -2,7 +2,8 @@ package pancakev3 import ( "context" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "time" @@ -21,15 +22,15 @@ import ( type PoolTracker struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolTracker( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) (*PoolTracker, error) { return &PoolTracker{ config: cfg, @@ -247,14 +248,14 @@ func (d *PoolTracker) getPoolTicks(ctx context.Context, poolAddress string) ([]T var ticks []TickResp for { - req := mutableclient.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, lastTickIdx)) + req := graphqlpkg.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, lastTickIdx)) var resp struct { Ticks []TickResp `json:"ticks"` Meta *valueobject.SubgraphMeta `json:"_meta"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &resp); err != nil { + if err := d.graphqlClient.Run(ctx, req, &resp); err != nil { // Workaround at the moment to live with the error subgraph on Arbitrum if allowSubgraphError && resp.Ticks == nil { l.WithFields(logger.Fields{ diff --git a/pkg/source/pancakev3/pools_list_updater.go b/pkg/source/pancakev3/pools_list_updater.go index 00cec1fad..9fb3e2335 100644 --- a/pkg/source/pancakev3/pools_list_updater.go +++ b/pkg/source/pancakev3/pools_list_updater.go @@ -3,7 +3,8 @@ package pancakev3 import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" @@ -21,15 +22,15 @@ import ( type PoolsListUpdater struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -42,13 +43,13 @@ func NewPoolsListUpdater( func (d *PoolsListUpdater) getPoolsList(ctx context.Context, lastCreatedAtTimestamp *big.Int, first, skip int) ([]SubgraphPool, error) { allowSubgraphError := d.config.IsAllowSubgraphError() - req := mutableclient.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, first, skip)) + req := graphqlpkg.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, first, skip)) var response struct { Pools []SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { // Workaround at the moment to live with the error subgraph on Arbitrum if allowSubgraphError && len(response.Pools) > 0 { return response.Pools, nil diff --git a/pkg/source/platypus/pools_list_updater.go b/pkg/source/platypus/pools_list_updater.go index 47b2fa543..93c717fe2 100644 --- a/pkg/source/platypus/pools_list_updater.go +++ b/pkg/source/platypus/pools_list_updater.go @@ -3,7 +3,8 @@ package platypus import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strings" "time" @@ -20,15 +21,15 @@ import ( type PoolsListUpdater struct { config *Config ethClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, ethClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -99,7 +100,7 @@ func (p *PoolsListUpdater) getPoolAddresses( ctx context.Context, lastUpdate string, ) ([]SubgraphPool, error) { - req := mutableclient.NewRequest(fmt.Sprintf(`{ + req := graphqlpkg.NewRequest(fmt.Sprintf(`{ pools ( where: { lastUpdate_gte: "%s" @@ -115,7 +116,7 @@ func (p *PoolsListUpdater) getPoolAddresses( var response struct { Pools []SubgraphPool `json:"pools"` } - if err := p.graphqlClient.Run(ctx, p.graphqlClientCfg, req, &response); err != nil { + if err := p.graphqlClient.Run(ctx, req, &response); err != nil { logger.WithFields(logger.Fields{ "lastUpdate": lastUpdate, "error": err, diff --git a/pkg/source/ramsesv2/pool_tracker.go b/pkg/source/ramsesv2/pool_tracker.go index 0367e7e60..110708922 100644 --- a/pkg/source/ramsesv2/pool_tracker.go +++ b/pkg/source/ramsesv2/pool_tracker.go @@ -2,7 +2,8 @@ package ramsesv2 import ( "context" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "time" @@ -21,15 +22,15 @@ import ( type PoolTracker struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolTracker( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) (*PoolTracker, error) { return &PoolTracker{ @@ -240,14 +241,14 @@ func (d *PoolTracker) getPoolTicks(ctx context.Context, poolAddress string) ([]T var ticks []TickResp for { - req := mutableclient.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, lastTickIdx)) + req := graphqlpkg.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, lastTickIdx)) var resp struct { Ticks []TickResp `json:"ticks"` Meta *valueobject.SubgraphMeta `json:"_meta"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &resp); err != nil { + if err := d.graphqlClient.Run(ctx, req, &resp); err != nil { // Workaround at the moment to live with the error subgraph on Arbitrum if allowSubgraphError { if resp.Ticks == nil { diff --git a/pkg/source/ramsesv2/pools_list_updater.go b/pkg/source/ramsesv2/pools_list_updater.go index 9fae337e8..15e433904 100644 --- a/pkg/source/ramsesv2/pools_list_updater.go +++ b/pkg/source/ramsesv2/pools_list_updater.go @@ -3,7 +3,8 @@ package ramsesv2 import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" @@ -21,15 +22,15 @@ import ( type PoolsListUpdater struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -42,13 +43,13 @@ func NewPoolsListUpdater( func (d *PoolsListUpdater) getPoolsList(ctx context.Context, lastCreatedAtTimestamp *big.Int, first, skip int) ([]SubgraphPool, error) { allowSubgraphError := d.config.IsAllowSubgraphError() - req := mutableclient.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, first, skip)) + req := graphqlpkg.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, first, skip)) var response struct { Pools []SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { logger.WithFields(logger.Fields{ "error": err, }).Errorf("failed to query subgraph") diff --git a/pkg/source/slipstream/pool_tracker.go b/pkg/source/slipstream/pool_tracker.go index 4fe2c6327..21c0eff38 100644 --- a/pkg/source/slipstream/pool_tracker.go +++ b/pkg/source/slipstream/pool_tracker.go @@ -2,7 +2,8 @@ package slipstream import ( "context" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "time" @@ -19,15 +20,15 @@ import ( type PoolTracker struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolTracker( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) (*PoolTracker, error) { return &PoolTracker{ config: cfg, @@ -245,13 +246,13 @@ func (d *PoolTracker) getPoolTicks(ctx context.Context, poolAddress string) ([]T var ticks []TickResp for { - req := mutableclient.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, lastTickIdx)) + req := graphqlpkg.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, lastTickIdx)) var resp struct { Ticks []TickResp `json:"ticks"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &resp); err != nil { + if err := d.graphqlClient.Run(ctx, req, &resp); err != nil { // Workaround at the moment to live with the error subgraph on Arbitrum if allowSubgraphError { if resp.Ticks == nil { diff --git a/pkg/source/slipstream/pools_list_updater.go b/pkg/source/slipstream/pools_list_updater.go index 7920404fa..7306e8312 100644 --- a/pkg/source/slipstream/pools_list_updater.go +++ b/pkg/source/slipstream/pools_list_updater.go @@ -3,7 +3,8 @@ package slipstream import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" @@ -21,15 +22,15 @@ import ( type PoolsListUpdater struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -42,13 +43,13 @@ func NewPoolsListUpdater( func (d *PoolsListUpdater) getPoolsList(ctx context.Context, lastCreatedAtTimestamp *big.Int, first, skip int) ([]SubgraphPool, error) { allowSubgraphError := d.config.IsAllowSubgraphError() - req := mutableclient.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, first, skip)) + req := graphqlpkg.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, first, skip)) var response struct { Pools []SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { // Workaround at the moment to live with the error subgraph on Arbitrum if allowSubgraphError && len(response.Pools) > 0 { return response.Pools, nil diff --git a/pkg/source/solidly-v3/pool_tracker.go b/pkg/source/solidly-v3/pool_tracker.go index 7c7b4cb1d..bef0a78d0 100644 --- a/pkg/source/solidly-v3/pool_tracker.go +++ b/pkg/source/solidly-v3/pool_tracker.go @@ -2,7 +2,8 @@ package solidlyv3 import ( "context" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "time" @@ -20,15 +21,15 @@ import ( type PoolTracker struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolTracker( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) (*PoolTracker, error) { return &PoolTracker{ config: cfg, @@ -219,14 +220,14 @@ func (d *PoolTracker) getPoolTicks(ctx context.Context, poolAddress string) ([]T var ticks []TickResp for { - req := mutableclient.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, lastTickIdx)) + req := graphqlpkg.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, lastTickIdx)) var resp struct { Pool *SubgraphPoolTicks `json:"pool"` Meta *valueobject.SubgraphMeta `json:"_meta"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &resp); err != nil { + if err := d.graphqlClient.Run(ctx, req, &resp); err != nil { // Workaround at the moment to live with the error subgraph on Arbitrum if allowSubgraphError { if resp.Pool == nil { diff --git a/pkg/source/solidly-v3/pools_list_updater.go b/pkg/source/solidly-v3/pools_list_updater.go index f01431e96..03c2a945c 100644 --- a/pkg/source/solidly-v3/pools_list_updater.go +++ b/pkg/source/solidly-v3/pools_list_updater.go @@ -3,7 +3,8 @@ package solidlyv3 import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" @@ -21,15 +22,15 @@ import ( type PoolsListUpdater struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -42,13 +43,13 @@ func NewPoolsListUpdater( func (d *PoolsListUpdater) getPoolsList(ctx context.Context, lastCreatedAtTimestamp *big.Int, first, skip int) ([]SubgraphPool, error) { allowSubgraphError := d.config.IsAllowSubgraphError() - req := mutableclient.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, first, skip)) + req := graphqlpkg.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, first, skip)) var response struct { Pools []SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { logger.WithFields(logger.Fields{ "error": err, }).Errorf("failed to query subgraph") diff --git a/pkg/source/uniswapv3/pool_tracker.go b/pkg/source/uniswapv3/pool_tracker.go index 28d2d3993..f5c5bac85 100644 --- a/pkg/source/uniswapv3/pool_tracker.go +++ b/pkg/source/uniswapv3/pool_tracker.go @@ -3,7 +3,8 @@ package uniswapv3 import ( "context" "errors" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "time" @@ -22,15 +23,15 @@ import ( type PoolTracker struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolTracker( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) (*PoolTracker, error) { initializedCfg, err := initializeConfig(cfg) if err != nil { @@ -285,13 +286,13 @@ func (d *PoolTracker) getPoolTicks(ctx context.Context, poolAddress string) ([]T var ticks []TickResp for { - req := mutableclient.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, lastTickIdx)) + req := graphqlpkg.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, lastTickIdx)) var resp struct { Ticks []TickResp `json:"ticks"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &resp); err != nil { + if err := d.graphqlClient.Run(ctx, req, &resp); err != nil { // Workaround at the moment to live with the error subgraph on Arbitrum if allowSubgraphError { if resp.Ticks == nil { diff --git a/pkg/source/uniswapv3/pools_list_updater.go b/pkg/source/uniswapv3/pools_list_updater.go index ee5540520..e635571a6 100644 --- a/pkg/source/uniswapv3/pools_list_updater.go +++ b/pkg/source/uniswapv3/pools_list_updater.go @@ -3,7 +3,8 @@ package uniswapv3 import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" @@ -20,15 +21,15 @@ import ( type PoolsListUpdater struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -41,13 +42,13 @@ func NewPoolsListUpdater( func (d *PoolsListUpdater) getPoolsList(ctx context.Context, lastCreatedAtTimestamp *big.Int, first, skip int) ([]SubgraphPool, error) { allowSubgraphError := d.config.IsAllowSubgraphError() - req := mutableclient.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, first, skip)) + req := graphqlpkg.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, first, skip)) var response struct { Pools []SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { // Workaround at the moment to live with the error subgraph on Arbitrum if allowSubgraphError && len(response.Pools) > 0 { return response.Pools, nil diff --git a/pkg/source/wombat/pool_tracker.go b/pkg/source/wombat/pool_tracker.go index 9008ae83e..40fadb34f 100644 --- a/pkg/source/wombat/pool_tracker.go +++ b/pkg/source/wombat/pool_tracker.go @@ -2,7 +2,8 @@ package wombat import ( "context" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "time" @@ -20,15 +21,15 @@ import ( type PoolTracker struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolTracker( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolTracker { return &PoolTracker{ config: cfg, diff --git a/pkg/source/wombat/pools_list_updater.go b/pkg/source/wombat/pools_list_updater.go index f828c1715..d87d9bc83 100644 --- a/pkg/source/wombat/pools_list_updater.go +++ b/pkg/source/wombat/pools_list_updater.go @@ -3,7 +3,8 @@ package wombat import ( "context" "fmt" - "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable" + graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql" + "math/big" "strconv" "time" @@ -18,15 +19,15 @@ import ( type PoolsListUpdater struct { config *Config ethrpcClient *ethrpc.Client - graphqlClient *mutableclient.MutableClient - graphqlClientCfg *mutableclient.Config + graphqlClient *graphqlpkg.Client + graphqlClientCfg *graphqlpkg.Config } func NewPoolsListUpdater( cfg *Config, ethrpcClient *ethrpc.Client, - graphqlClient *mutableclient.MutableClient, - graphqlClientCfg *mutableclient.Config, + graphqlClient *graphqlpkg.Client, + graphqlClientCfg *graphqlpkg.Config, ) *PoolsListUpdater { return &PoolsListUpdater{ config: cfg, @@ -141,7 +142,7 @@ func (d *PoolsListUpdater) querySubgraph( ctx context.Context, lastCreateTime uint64, ) ([]*SubgraphPool, error) { - req := mutableclient.NewRequest(fmt.Sprintf(`{ + req := graphqlpkg.NewRequest(fmt.Sprintf(`{ pools( orderBy: createdTimestamp orderDirection: asc @@ -163,7 +164,7 @@ func (d *PoolsListUpdater) querySubgraph( var response struct { Pools []*SubgraphPool `json:"pools"` } - if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil { + if err := d.graphqlClient.Run(ctx, req, &response); err != nil { logger.WithFields(logger.Fields{ "type": DexTypeWombat, "error": err, diff --git a/pkg/util/graphql/graphql.go b/pkg/util/graphql/graphql.go new file mode 100644 index 000000000..94d1816e4 --- /dev/null +++ b/pkg/util/graphql/graphql.go @@ -0,0 +1,346 @@ +// Package graphql provides a low level GraphQL client. +// machinebox/graphql https://github.com/machinebox/graphql/blob/3a92531802258604bd12793465c2e28bc4b2fc85/graphql.go +// +// // create a client (safe to share across requests) +// client := graphql.NewClient("https://machinebox.io/graphql") +// +// // make a request +// req := graphqlpkg.NewRequest(` +// query ($key: String!) { +// items (id:$key) { +// field1 +// field2 +// field3 +// } +// } +// `) +// +// // set any variables +// req.Var("key", "value") +// +// // run it and capture the response +// var respData ResponseStruct +// if err := client.Run(ctx, req, &respData); err != nil { +// log.Fatal(err) +// } +// +// # Specify client +// +// To specify your own http.Client, use the WithHTTPClient option: +// +// httpclient := &http.Client{} +// client := graphql.NewClient("https://machinebox.io/graphql", graphql.WithHTTPClient(httpclient)) +package graphql + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + + "github.com/pkg/errors" +) + +type IClient interface { + Invoke(ctx context.Context, req *Request, resp interface{}) error +} + +// Client is a client for interacting with a GraphQL API. +type Client struct { + endpoint string + httpClient *http.Client + useMultipartForm bool + + interceptors []Interceptor + + // closeReq will close the request body immediately allowing for reuse of client + closeReq bool + + // Log is called with various debug information. + // To log to standard out, use: + // client.Log = func(s string) { log.Println(s) } + Log func(s string) +} + +// NewClient makes a new Client capable of making GraphQL requests. +func NewClient(endpoint string, opts ...ClientOption) *Client { + c := &Client{ + endpoint: endpoint, + Log: func(string) {}, + } + for _, optionFunc := range opts { + optionFunc(c) + } + if c.httpClient == nil { + c.httpClient = http.DefaultClient + } + return c +} + +func (c *Client) logf(format string, args ...interface{}) { + c.Log(fmt.Sprintf(format, args...)) +} + +// Run executes the query and unmarshals the response from the data field +// into the response object. +// Pass in a nil response object to skip response parsing. +// If the request fails or the server returns an error, the first error +// will be returned. +func (c *Client) Run(ctx context.Context, req *Request, resp interface{}) error { + return ComposeInvokeInterceptor(c, c.interceptors...).Invoke(ctx, req, resp) +} + +func (c *Client) Invoke(ctx context.Context, req *Request, resp interface{}) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if len(req.files) > 0 && !c.useMultipartForm { + return errors.New("cannot send files with PostFields option") + } + var err error + if c.useMultipartForm { + err = c.runWithPostFields(ctx, req, resp) + } else { + err = c.runWithJSON(ctx, req, resp) + } + return err +} + +func (c *Client) runWithJSON(ctx context.Context, req *Request, resp interface{}) error { + var requestBody bytes.Buffer + requestBodyObj := struct { + Query string `json:"query"` + Variables map[string]interface{} `json:"variables"` + }{ + Query: req.q, + Variables: req.vars, + } + if err := json.NewEncoder(&requestBody).Encode(requestBodyObj); err != nil { + return errors.Wrap(err, "encode body") + } + c.logf(">> variables: %v", req.vars) + c.logf(">> query: %s", req.q) + gr := &graphResponse{ + Data: resp, + } + r, err := http.NewRequest(http.MethodPost, c.endpoint, &requestBody) + if err != nil { + return err + } + r.Close = c.closeReq + r.Header.Set("Content-Type", "application/json; charset=utf-8") + r.Header.Set("Accept", "application/json; charset=utf-8") + for key, values := range req.Header { + for _, value := range values { + r.Header.Add(key, value) + } + } + c.logf(">> headers: %v", r.Header) + r = r.WithContext(ctx) + res, err := c.httpClient.Do(r) + if err != nil { + return err + } + defer res.Body.Close() + var buf bytes.Buffer + if _, err := io.Copy(&buf, res.Body); err != nil { + return errors.Wrap(err, "reading body") + } + c.logf("<< %s", buf.String()) + if err := json.NewDecoder(&buf).Decode(&gr); err != nil { + if res.StatusCode != http.StatusOK { + return fmt.Errorf("graphql: server returned a non-200 status code: %v", res.StatusCode) + } + return errors.Wrap(err, "decoding response") + } + if len(gr.Errors) > 0 { + // return first error + return gr.Errors[0] + } + return nil +} + +func (c *Client) runWithPostFields(ctx context.Context, req *Request, resp interface{}) error { + var requestBody bytes.Buffer + writer := multipart.NewWriter(&requestBody) + if err := writer.WriteField("query", req.q); err != nil { + return errors.Wrap(err, "write query field") + } + var variablesBuf bytes.Buffer + if len(req.vars) > 0 { + variablesField, err := writer.CreateFormField("variables") + if err != nil { + return errors.Wrap(err, "create variables field") + } + if err := json.NewEncoder(io.MultiWriter(variablesField, &variablesBuf)).Encode(req.vars); err != nil { + return errors.Wrap(err, "encode variables") + } + } + for i := range req.files { + part, err := writer.CreateFormFile(req.files[i].Field, req.files[i].Name) + if err != nil { + return errors.Wrap(err, "create form file") + } + if _, err := io.Copy(part, req.files[i].R); err != nil { + return errors.Wrap(err, "preparing file") + } + } + if err := writer.Close(); err != nil { + return errors.Wrap(err, "close writer") + } + c.logf(">> variables: %s", variablesBuf.String()) + c.logf(">> files: %d", len(req.files)) + c.logf(">> query: %s", req.q) + gr := &graphResponse{ + Data: resp, + } + r, err := http.NewRequest(http.MethodPost, c.endpoint, &requestBody) + if err != nil { + return err + } + r.Close = c.closeReq + r.Header.Set("Content-Type", writer.FormDataContentType()) + r.Header.Set("Accept", "application/json; charset=utf-8") + for key, values := range req.Header { + for _, value := range values { + r.Header.Add(key, value) + } + } + c.logf(">> headers: %v", r.Header) + r = r.WithContext(ctx) + res, err := c.httpClient.Do(r) + if err != nil { + return err + } + defer res.Body.Close() + var buf bytes.Buffer + if _, err := io.Copy(&buf, res.Body); err != nil { + return errors.Wrap(err, "reading body") + } + c.logf("<< %s", buf.String()) + if err := json.NewDecoder(&buf).Decode(&gr); err != nil { + if res.StatusCode != http.StatusOK { + return fmt.Errorf("graphql: server returned a non-200 status code: %v", res.StatusCode) + } + return errors.Wrap(err, "decoding response") + } + if len(gr.Errors) > 0 { + // return first error + return gr.Errors[0] + } + return nil +} + +func WithRunInterceptors(interceptors ...Interceptor) ClientOption { + return func(client *Client) { + client.interceptors = append(client.interceptors, interceptors...) + } +} + +// WithHTTPClient specifies the underlying http.Client to use when +// making requests. +// +// NewClient(endpoint, WithHTTPClient(specificHTTPClient)) +func WithHTTPClient(httpclient *http.Client) ClientOption { + return func(client *Client) { + client.httpClient = httpclient + } +} + +// UseMultipartForm uses multipart/form-data and activates support for +// files. +func UseMultipartForm() ClientOption { + return func(client *Client) { + client.useMultipartForm = true + } +} + +// ImmediatelyCloseReqBody will close the req body immediately after each request body is ready +func ImmediatelyCloseReqBody() ClientOption { + return func(client *Client) { + client.closeReq = true + } +} + +// ClientOption are functions that are passed into NewClient to +// modify the behaviour of the Client. +type ClientOption func(*Client) + +type graphErr struct { + Message string +} + +func (e graphErr) Error() string { + return "graphql: " + e.Message +} + +type graphResponse struct { + Data interface{} + Errors []graphErr +} + +// Request is a GraphQL request. +type Request struct { + q string + vars map[string]interface{} + files []File + + // Header represent any request headers that will be set + // when the request is made. + Header http.Header +} + +// NewRequest makes a new Request with the specified string. +func NewRequest(q string) *Request { + req := &Request{ + q: q, + Header: make(map[string][]string), + } + return req +} + +// Var sets a variable. +func (req *Request) Var(key string, value interface{}) { + if req.vars == nil { + req.vars = make(map[string]interface{}) + } + req.vars[key] = value +} + +// Vars gets the variables for this Request. +func (req *Request) Vars() map[string]interface{} { + return req.vars +} + +// Files gets the files in this request. +func (req *Request) Files() []File { + return req.files +} + +// Query gets the query string of this request. +func (req *Request) Query() string { + return req.q +} + +// File sets a file to upload. +// Files are only supported with a Client that was created with +// the UseMultipartForm option. +func (req *Request) File(fieldname, filename string, r io.Reader) { + req.files = append(req.files, File{ + Field: fieldname, + Name: filename, + R: r, + }) +} + +// File represents a file to upload. +type File struct { + Field string + Name string + R io.Reader +} diff --git a/pkg/util/graphql/interceptor.go b/pkg/util/graphql/interceptor.go new file mode 100644 index 000000000..f514f695e --- /dev/null +++ b/pkg/util/graphql/interceptor.go @@ -0,0 +1,26 @@ +package graphql + +import ( + "context" +) + +type ClientFunc func(ctx context.Context, req *Request, resp interface{}) error + +func (fn ClientFunc) Invoke(ctx context.Context, req *Request, resp interface{}) error { + return fn(ctx, req, resp) +} + +type Interceptor interface { + Next(fn IClient) IClient +} + +type InterceptorFunc func(client IClient) IClient + +func (fn InterceptorFunc) Next(client IClient) IClient { return fn(client) } + +func ComposeInvokeInterceptor(client IClient, interceptors ...Interceptor) IClient { + for _, interceptor := range interceptors { + client = interceptor.Next(client) + } + return client +} diff --git a/pkg/util/graphql/options.go b/pkg/util/graphql/options.go deleted file mode 100644 index 5cb55ee98..000000000 --- a/pkg/util/graphql/options.go +++ /dev/null @@ -1,6 +0,0 @@ -package graphql - -type CallOption interface { -} - -type EmptyCallOption struct{} diff --git a/pkg/util/graphql/timeout.go b/pkg/util/graphql/timeout.go index 82946e8ac..67161c389 100644 --- a/pkg/util/graphql/timeout.go +++ b/pkg/util/graphql/timeout.go @@ -1,10 +1,10 @@ package graphql import ( + httppkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/http" + "github.com/machinebox/graphql" "net/http" "time" - - "github.com/machinebox/graphql" ) const ( @@ -48,3 +48,39 @@ func NewWithTimeout(url string, timeout time.Duration) *graphql.Client { Timeout: timeout, }) } + +func NewGraphQLClient(cfg Config, interceptors ...any) *Client { + if cfg.Timeout == 0 { + cfg.Timeout = DefaultGraphQLRequestTimeout + } + + httpInterceptors, graphqlInterceptors := filterInterceptors(interceptors) + + httpClient := &http.Client{ + Timeout: cfg.Timeout, + Transport: &TransportWithDefaultHeaders{ + Transport: http.DefaultTransport, + Headers: cfg.Header, + }, + } + httpClient.Transport = httppkg.ComposeInterceptor(httpClient.Transport, httpInterceptors...) + + return NewClient( + cfg.Url, + WithHTTPClient(httpClient), WithRunInterceptors(graphqlInterceptors...), + ) +} + +func filterInterceptors(interceptors []interface{}) ([]httppkg.Interceptor, []Interceptor) { + httpInterceptors := make([]httppkg.Interceptor, 0) + graphqlInterceptors := make([]Interceptor, 0) + for _, interceptor := range interceptors { + switch i := interceptor.(type) { + case httppkg.Interceptor: + httpInterceptors = append(httpInterceptors, i) + case Interceptor: + graphqlInterceptors = append(graphqlInterceptors, i) + } + } + return httpInterceptors, graphqlInterceptors +} diff --git a/pkg/util/http/interceptor.go b/pkg/util/http/interceptor.go new file mode 100644 index 000000000..98c827416 --- /dev/null +++ b/pkg/util/http/interceptor.go @@ -0,0 +1,30 @@ +package http + +import ( + "net/http" +) + +// RoundTripperFunc implement http.RoundTripper for convenient usage. +type RoundTripperFunc func(*http.Request) (*http.Response, error) + +func (fn RoundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return fn(r) +} + +// Interceptor is interceptor that can do more work before/after a request +type Interceptor interface { + Next(fn http.RoundTripper) http.RoundTripper +} + +// InterceptorFunc implement Interceptor for convenient usage. +type InterceptorFunc func(rt http.RoundTripper) http.RoundTripper + +func (fn InterceptorFunc) Next(rt http.RoundTripper) http.RoundTripper { return fn(rt) } + +// ComposeInterceptor compose interceptors to given http.RoundTripper +func ComposeInterceptor(rt http.RoundTripper, interceptors ...Interceptor) http.RoundTripper { + for _, interceptor := range interceptors { + rt = interceptor.Next(rt) + } + return rt +}