Skip to content

Commit

Permalink
refactor: add interceptors for http.RoundTripper and custom graphql.C…
Browse files Browse the repository at this point in the history
…lient
  • Loading branch information
minhnhathoang committed Jan 1, 2025
1 parent 5230708 commit 9cc5946
Show file tree
Hide file tree
Showing 50 changed files with 793 additions and 295 deletions.
15 changes: 8 additions & 7 deletions pkg/liquidity-source/algebra/integral/pool_list_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package integral
import (
"context"
"fmt"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable"
graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql"

"math/big"
"strconv"

Expand All @@ -16,14 +17,14 @@ import (

type PoolsListUpdater struct {
config *Config
graphqlClient *mutableclient.MutableClient
graphqlClientCfg *mutableclient.Config
graphqlClient *graphqlpkg.Client
graphqlClientCfg *graphqlpkg.Config
}

func NewPoolsListUpdater(
cfg *Config,
graphqlClient *mutableclient.MutableClient,
graphqlClientCfg *mutableclient.Config,
graphqlClient *graphqlpkg.Client,
graphqlClientCfg *graphqlpkg.Config,
) *PoolsListUpdater {
return &PoolsListUpdater{
config: cfg,
Expand All @@ -35,13 +36,13 @@ func NewPoolsListUpdater(
func (d *PoolsListUpdater) getPoolsList(ctx context.Context, lastCreatedAtTimestamp *big.Int, lastPoolIds []string, first, skip int) ([]SubgraphPool, error) {
allowSubgraphError := d.config.AllowSubgraphError

req := mutableclient.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, lastPoolIds, first, skip))
req := graphqlpkg.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, lastPoolIds, first, skip))

var response struct {
Pools []SubgraphPool `json:"pools"`
}

if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil {
if err := d.graphqlClient.Run(ctx, req, &response); err != nil {
if allowSubgraphError && len(response.Pools) > 0 {
return response.Pools, nil
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/liquidity-source/algebra/integral/pool_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package integral

import (
"context"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable"
graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql"

"math/big"
"time"

Expand All @@ -23,15 +24,15 @@ import (
type PoolTracker struct {
config *Config
ethrpcClient *ethrpc.Client
graphqlClient *mutableclient.MutableClient
graphqlClientCfg *mutableclient.Config
graphqlClient *graphqlpkg.Client
graphqlClientCfg *graphqlpkg.Config
}

func NewPoolTracker(
cfg *Config,
ethrpcClient *ethrpc.Client,
graphqlClient *mutableclient.MutableClient,
graphqlClientCfg *mutableclient.Config,
graphqlClient *graphqlpkg.Client,
graphqlClientCfg *graphqlpkg.Config,
) (*PoolTracker, error) {
return &PoolTracker{
config: cfg,
Expand Down Expand Up @@ -568,14 +569,14 @@ func (d *PoolTracker) getPoolTicks(ctx context.Context, poolAddress string) ([]T
var ticks []TickResp

for {
req := mutableclient.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, skip))
req := graphqlpkg.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, skip))

var resp struct {
Pool *SubgraphPoolTicks `json:"pool"`
Meta *valueobject.SubgraphMeta `json:"_meta"`
}

if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &resp); err != nil {
if err := d.graphqlClient.Run(ctx, req, &resp); err != nil {
if allowSubgraphError {
if resp.Pool == nil {
l.WithFields(logger.Fields{
Expand Down
15 changes: 8 additions & 7 deletions pkg/liquidity-source/algebra/v1/pool_list_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package algebrav1
import (
"context"
"fmt"
mutableclient "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable"
graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql"

"math/big"
"strconv"

Expand All @@ -15,14 +16,14 @@ import (

type PoolsListUpdater struct {
config *Config
graphqlClient *mutableclient.MutableClient
graphqlClientCfg *mutableclient.Config
graphqlClient *graphqlpkg.Client
graphqlClientCfg *graphqlpkg.Config
}

func NewPoolsListUpdater(
cfg *Config,
graphqlClient *mutableclient.MutableClient,
graphqlClientCfg *mutableclient.Config,
graphqlClient *graphqlpkg.Client,
graphqlClientCfg *graphqlpkg.Config,
) *PoolsListUpdater {
return &PoolsListUpdater{
config: cfg,
Expand All @@ -34,13 +35,13 @@ func NewPoolsListUpdater(
func (d *PoolsListUpdater) getPoolsList(ctx context.Context, lastCreatedAtTimestamp *big.Int, lastPoolIds []string, first, skip int) ([]SubgraphPool, error) {
allowSubgraphError := d.config.AllowSubgraphError

req := mutableclient.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, lastPoolIds, first, skip))
req := graphqlpkg.NewRequest(getPoolsListQuery(allowSubgraphError, lastCreatedAtTimestamp, lastPoolIds, first, skip))

var response struct {
Pools []SubgraphPool `json:"pools"`
}

if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &response); err != nil {
if err := d.graphqlClient.Run(ctx, req, &response); err != nil {
if allowSubgraphError && len(response.Pools) > 0 {
return response.Pools, nil
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/liquidity-source/algebra/v1/pool_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package algebrav1

import (
"context"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable"
graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql"

"math/big"
"time"

Expand All @@ -23,15 +24,15 @@ import (
type PoolTracker struct {
config *Config
ethrpcClient *ethrpc.Client
graphqlClient *mutableclient.MutableClient
graphqlClientCfg *mutableclient.Config
graphqlClient *graphqlpkg.Client
graphqlClientCfg *graphqlpkg.Config
}

func NewPoolTracker(
cfg *Config,
ethrpcClient *ethrpc.Client,
graphqlClient *mutableclient.MutableClient,
graphqlClientCfg *mutableclient.Config,
graphqlClient *graphqlpkg.Client,
graphqlClientCfg *graphqlpkg.Config,
) (*PoolTracker, error) {
return &PoolTracker{
config: cfg,
Expand Down Expand Up @@ -619,14 +620,14 @@ func (d *PoolTracker) getPoolTicks(ctx context.Context, poolAddress string) ([]T
var ticks []TickResp

for {
req := mutableclient.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, skip))
req := graphqlpkg.NewRequest(getPoolTicksQuery(allowSubgraphError, poolAddress, skip))

var resp struct {
Pool *SubgraphPoolTicks `json:"pool"`
Meta *valueobject.SubgraphMeta `json:"_meta"`
}

if err := d.graphqlClient.Run(ctx, d.graphqlClientCfg, req, &resp); err != nil {
if err := d.graphqlClient.Run(ctx, req, &resp); err != nil {
if allowSubgraphError {
if resp.Pool == nil {
l.WithFields(logger.Fields{
Expand Down
15 changes: 8 additions & 7 deletions pkg/liquidity-source/ambient/pool_list_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package ambient
import (
"context"
"fmt"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable"
graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql"

"strings"
"time"

Expand All @@ -18,15 +19,15 @@ import (
type PoolListUpdater struct {
cfg Config
poolDatastore IPoolDatastore
graphqlClient *mutableclient.MutableClient
graphqlClientCfg *mutableclient.Config
graphqlClient *graphqlpkg.Client
graphqlClientCfg *graphqlpkg.Config
}

func NewPoolsListUpdater(
cfg Config,
poolDatastore IPoolDatastore,
graphqlClient *mutableclient.MutableClient,
graphqlClientCfg *mutableclient.Config,
graphqlClient *graphqlpkg.Client,
graphqlClientCfg *graphqlpkg.Config,
) (*PoolListUpdater, error) {
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("invalid config: %w", err)
Expand Down Expand Up @@ -116,7 +117,7 @@ func (u *PoolListUpdater) fetchSubgraph(ctx context.Context, lastCreateTime uint
limit = u.cfg.SubgraphLimit
}
var (
req = mutableclient.NewRequest(fmt.Sprintf(`{
req = graphqlpkg.NewRequest(fmt.Sprintf(`{
pools(
where: {
timeCreate_gt: %d,
Expand All @@ -136,7 +137,7 @@ func (u *PoolListUpdater) fetchSubgraph(ctx context.Context, lastCreateTime uint
resp SubgraphPoolsResponse
)

if err := u.graphqlClient.Run(ctx, u.graphqlClientCfg, req, &resp); err != nil {
if err := u.graphqlClient.Run(ctx, req, &resp); err != nil {
return nil, err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/liquidity-source/ambient/pool_list_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ambient_test
import (
"context"
"fmt"
mutableclient "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable"
graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql"
"math/big"
"os"
"testing"
Expand Down Expand Up @@ -52,11 +52,11 @@ func TestPoolListUpdater(t *testing.T) {
MulticallContractAddress: multicallAddress,
}

graphqlClientCfg = &mutableclient.Config{
graphqlClientCfg = &graphqlpkg.Config{
Url: config.SubgraphAPI,
Timeout: config.SubgraphRequestTimeout.Duration,
}
graphqlClient = mutableclient.New(*graphqlClientCfg)
graphqlClient = graphqlpkg.New(*graphqlClientCfg)
)

{
Expand Down
14 changes: 7 additions & 7 deletions pkg/liquidity-source/balancer-v1/pools_list_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package balancerv1
import (
"context"
"fmt"
"github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql/mutable"
graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql"
"time"

"github.com/KyberNetwork/ethrpc"
Expand All @@ -17,8 +17,8 @@ type (
PoolsListUpdater struct {
config *Config
ethrpcClient *ethrpc.Client
graphqlClient *mutableclient.MutableClient
graphqlClientCfg *mutableclient.Config
graphqlClient *graphqlpkg.Client
graphqlClientCfg *graphqlpkg.Config
}

PoolsListUpdaterMetadata struct {
Expand All @@ -39,8 +39,8 @@ type (
func NewPoolsListUpdater(
cfg *Config,
ethrpcClient *ethrpc.Client,
graphqlClient *mutableclient.MutableClient,
graphqlClientCfg *mutableclient.Config,
graphqlClient *graphqlpkg.Client,
graphqlClientCfg *graphqlpkg.Config,
) *PoolsListUpdater {
return &PoolsListUpdater{
config: cfg,
Expand Down Expand Up @@ -145,11 +145,11 @@ func (u *PoolsListUpdater) initPools(_ context.Context, subgraphPools []FetchPoo

func (u *PoolsListUpdater) fetchPoolsFromSubgraph(ctx context.Context, lastCreateTime int) ([]FetchPoolsResponsePool, error) {
var (
req = mutableclient.NewRequest(newFetchPoolIDsQuery(lastCreateTime, u.config.NewPoolLimit))
req = graphqlpkg.NewRequest(newFetchPoolIDsQuery(lastCreateTime, u.config.NewPoolLimit))
resp FetchPoolsResponse
)

if err := u.graphqlClient.Run(ctx, u.graphqlClientCfg, req, &resp); err != nil {
if err := u.graphqlClient.Run(ctx, req, &resp); err != nil {
return nil, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package composablestable

import (
"context"
graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql"

"math/big"
"strings"
"time"
Expand All @@ -23,14 +25,18 @@ type PoolsListUpdater struct {
sharedUpdater *shared.PoolsListUpdater
}

func NewPoolsListUpdater(config *Config, ethrpcClient *ethrpc.Client) *PoolsListUpdater {
func NewPoolsListUpdater(config *Config,
ethrpcClient *ethrpc.Client,
graphqlClient *graphqlpkg.Client,
graphqlClientCfg *graphqlpkg.Config,
) *PoolsListUpdater {
sharedUpdater := shared.NewPoolsListUpdater(&shared.Config{
DexID: config.DexID,
SubgraphAPI: config.SubgraphAPI,
SubgraphHeaders: config.SubgraphHeaders,
NewPoolLimit: config.NewPoolLimit,
PoolTypes: []string{poolTypeComposableStable},
})
}, graphqlClient, graphqlClientCfg)

return &PoolsListUpdater{
config: config,
Expand Down
25 changes: 12 additions & 13 deletions pkg/liquidity-source/balancer-v2/shared/pools_list_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"time"

"github.com/goccy/go-json"
"github.com/machinebox/graphql"

graphqlpkg "github.com/KyberNetwork/kyberswap-dex-lib/pkg/util/graphql"
)

type (
PoolsListUpdater struct {
config *Config
graphqlClient *graphql.Client
config *Config
graphqlClient *graphqlpkg.Client
graphqlClientCfg *graphqlpkg.Config
}

Config struct {
Expand All @@ -33,16 +33,15 @@ type (

const graphQLRequestTimeout = 20 * time.Second

func NewPoolsListUpdater(config *Config) *PoolsListUpdater {
graphqlClient := graphqlpkg.New(graphqlpkg.Config{
Url: config.SubgraphAPI,
Header: config.SubgraphHeaders,
Timeout: graphQLRequestTimeout,
})

func NewPoolsListUpdater(
config *Config,
graphqlClient *graphqlpkg.Client,
graphqlClientCfg *graphqlpkg.Config,
) *PoolsListUpdater {
return &PoolsListUpdater{
config: config,
graphqlClient: graphqlClient,
config: config,
graphqlClient: graphqlClient,
graphqlClientCfg: graphqlClientCfg,
}
}

Expand Down Expand Up @@ -87,7 +86,7 @@ func (u *PoolsListUpdater) querySubgraph(ctx context.Context, lastCreateTime *bi
u.config.NewPoolLimit,
0,
)
req := graphql.NewRequest(query)
req := graphqlpkg.NewRequest(query)

if err := u.graphqlClient.Run(ctx, req, &response); err != nil {
return nil, nil, err
Expand Down
Loading

0 comments on commit 9cc5946

Please sign in to comment.