Skip to content

Commit

Permalink
remove tidb dependency
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu committed Nov 19, 2023
1 parent 0837ce5 commit 30a7631
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 52 deletions.
5 changes: 2 additions & 3 deletions cdc/cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
Expand Down Expand Up @@ -61,7 +60,7 @@ type Capture struct {
election *concurrency.Election

pdClient pd.Client
kvStorage tidbkv.Storage
kvStorage tikv.Storage
createEtcdClient createEtcdClientFunc
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
Expand All @@ -75,7 +74,7 @@ type Capture struct {
}

// NewCapture returns a new Capture instance
func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, createEtcdClient createEtcdClientFunc) *Capture {
func NewCapture(pdClient pd.Client, kvStorage tikv.Storage, createEtcdClient createEtcdClientFunc) *Capture {

Check warning on line 77 in cdc/cdc/capture/capture.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/capture/capture.go#L77

Added line #L77 was not covered by tests
return &Capture{
pdClient: pdClient,
kvStorage: kvStorage,
Expand Down
17 changes: 9 additions & 8 deletions cdc/cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
log.Warn("failed to get current version from PD", zap.Error(err))
continue
}
currentTimeFromPD := oracle.GetTimeFromTS(version.Ver)
currentTimeFromPD := oracle.GetTimeFromTS(version)
expired := make([]*regionTsInfo, 0)
for w.rtsManager.Len() > 0 {
item := w.rtsManager.Pop()
Expand All @@ -330,7 +330,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
if len(expired) == 0 {
continue
}
maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0)
// maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0)
for _, rts := range expired {
state, ok := w.getRegionState(rts.regionID)
if !ok || state.isStopped() {
Expand Down Expand Up @@ -359,18 +359,19 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
w.rtsManager.Upsert(rts)
continue
}
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock",
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time",

Check warning on line 362 in cdc/cdc/kv/region_worker.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/kv/region_worker.go#L362

Added line #L362 was not covered by tests
zap.Uint64("regionID", rts.regionID),
zap.Stringer("span", state.getRegionSpan()),
zap.Duration("duration", sinceLastResolvedTs),
zap.Duration("lastEvent", sinceLastEvent),
zap.Uint64("resolvedTs", lastResolvedTs),
)
err = w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion)
if err != nil {
log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), zap.Error(err))
continue
}
// Resolve locks for RawKV is not necessary. Add it back after we support TxnKV.
// err = w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion)
// if err != nil {
// log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), zap.Error(err))
// continue
// }
rts.ts.penalty = 0
}
rts.ts.resolvedTs = lastResolvedTs
Expand Down
33 changes: 9 additions & 24 deletions cdc/cdc/kv/store_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,16 @@
package kv

import (
"fmt"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbconfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/driver"
tikvconfig "github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/migration/cdc/cdc/model"
cerror "github.com/tikv/migration/cdc/pkg/errors"
"github.com/tikv/migration/cdc/pkg/flags"
Expand All @@ -38,7 +34,7 @@ import (
// TiKVStorage is the tikv storage interface used by CDC.
type TiKVStorage interface {
tikv.Storage
GetCachedCurrentVersion() (version tidbkv.Version, err error)
GetCachedCurrentVersion() (version uint64, err error)
}

const (
Expand Down Expand Up @@ -80,7 +76,7 @@ func newStorageWithCurVersionCache(storage tikv.Storage, cacheKey string) TiKVSt
}

// GetCachedCurrentVersion gets the cached version of currentVersion, and update the cache if necessary
func (s *StorageWithCurVersionCache) GetCachedCurrentVersion() (version tidbkv.Version, err error) {
func (s *StorageWithCurVersionCache) GetCachedCurrentVersion() (version uint64, err error) {
curVersionCacheMu.Lock()
entry, exists := curVersionCache[s.cacheKey]
curVersionCacheMu.Unlock()
Expand All @@ -104,37 +100,26 @@ func (s *StorageWithCurVersionCache) GetCachedCurrentVersion() (version tidbkv.V
entry.lastUpdated = time.Now()
}

version.Ver = entry.ts
version = entry.ts
return
}

// GetSnapshotMeta returns tidb meta information
// TODO: Simplify the signature of this function
func GetSnapshotMeta(tiStore tidbkv.Storage, ts uint64) (*meta.Meta, error) {
snapshot := tiStore.GetSnapshot(tidbkv.NewVersion(ts))
return meta.NewSnapshotMeta(snapshot), nil
}

// CreateTiStore creates a new tikv storage client
func CreateTiStore(urls string, credential *security.Credential) (kv.Storage, error) {
func CreateTiStore(urls string, credential *security.Credential) (tikv.Storage, error) {

Check warning on line 108 in cdc/cdc/kv/store_op.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/kv/store_op.go#L108

Added line #L108 was not covered by tests
urlv, err := flags.NewURLsValue(urls)
if err != nil {
return nil, errors.Trace(err)
}

// Ignore error if it is already registered.
_ = store.Register("tikv", driver.TiKVDriver{})

if credential.CAPath != "" {
conf := tidbconfig.GetGlobalConfig()
conf := tikvconfig.GetGlobalConfig()

Check warning on line 115 in cdc/cdc/kv/store_op.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/kv/store_op.go#L115

Added line #L115 was not covered by tests
conf.Security.ClusterSSLCA = credential.CAPath
conf.Security.ClusterSSLCert = credential.CertPath
conf.Security.ClusterSSLKey = credential.KeyPath
tidbconfig.StoreGlobalConfig(conf)
tikvconfig.StoreGlobalConfig(conf)

Check warning on line 119 in cdc/cdc/kv/store_op.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/kv/store_op.go#L119

Added line #L119 was not covered by tests
}

tiPath := fmt.Sprintf("tikv://%s?disableGC=true", urlv.HostString())
tiStore, err := store.New(tiPath)
tiStore, err := txnkv.NewClient(urlv.HostList())

Check warning on line 122 in cdc/cdc/kv/store_op.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/kv/store_op.go#L122

Added line #L122 was not covered by tests
if err != nil {
return nil, cerror.WrapError(cerror.ErrNewStore, err)
}
Expand Down
7 changes: 1 addition & 6 deletions cdc/cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/migration/cdc/cdc/kv"
Expand Down Expand Up @@ -68,15 +67,11 @@ func NewPuller(
pdCli pd.Client,
grpcPool kv.GrpcPool,
regionCache *tikv.RegionCache,
kvStorage tidbkv.Storage,
tikvStorage tikv.Storage,
checkpointTs uint64,
spans []regionspan.Span,
enableOldValue bool,
) Puller {
tikvStorage, ok := kvStorage.(tikv.Storage)
if !ok {
log.Panic("can't create puller for non-tikv storage")
}
comparableSpans := make([]regionspan.ComparableSpan, len(spans))
for i := range spans {
comparableSpans[i] = regionspan.ToComparableSpan(spans[i])
Expand Down
13 changes: 9 additions & 4 deletions cdc/cdc/puller/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/pingcap/check"
"github.com/pingcap/errors"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/migration/cdc/cdc/kv"
Expand Down Expand Up @@ -116,11 +115,17 @@ func (s *pullerSuite) newPullerForTest(
c *check.C,
spans []regionspan.Span,
checkpointTs uint64,
) (*mockInjectedPuller, context.CancelFunc, *sync.WaitGroup, tidbkv.Storage) {
) (*mockInjectedPuller, context.CancelFunc, *sync.WaitGroup, tikv.Storage) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
store, err := mockstore.NewMockStore()
c.Assert(err, check.IsNil)

tikvStorage, ok := store.(tikv.Storage)
if !ok {
panic("can't create puller for non-tikv storage")
}

enableOldValue := true
backupNewCDCKVClient := kv.NewCDCKVClient
kv.NewCDCKVClient = newMockCDCKVClient
Expand All @@ -132,7 +137,7 @@ func (s *pullerSuite) newPullerForTest(
defer grpcPool.Close()
regionCache := tikv.NewRegionCache(pdCli)
defer regionCache.Close()
plr := NewPuller(ctx, pdCli, grpcPool, regionCache, store, checkpointTs, spans, enableOldValue)
plr := NewPuller(ctx, pdCli, grpcPool, regionCache, tikvStorage, checkpointTs, spans, enableOldValue)
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -146,7 +151,7 @@ func (s *pullerSuite) newPullerForTest(
Puller: plr,
cli: plr.(*pullerImpl).kvCli.(*mockCDCKVClient),
}
return mockPlr, cancel, &wg, store
return mockPlr, cancel, &wg, tikvStorage
}

func (s *pullerSuite) TestPullerResolvedForward(c *check.C) {
Expand Down
4 changes: 2 additions & 2 deletions cdc/cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -63,7 +63,7 @@ type Server struct {
statusServer *http.Server
pdClient pd.Client
etcdClient *etcd.CDCEtcdClient
kvStorage tidbkv.Storage
kvStorage tikv.Storage
pdEndpoints []string
}

Expand Down
3 changes: 1 addition & 2 deletions cdc/pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"time"

"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/migration/cdc/cdc/kv"
Expand All @@ -37,7 +36,7 @@ import (
// All field in Vars should be READ-ONLY and THREAD-SAFE
type GlobalVars struct {
PDClient pd.Client
KVStorage tidbkv.Storage
KVStorage tikv.Storage
CaptureInfo *model.CaptureInfo
EtcdClient *etcd.CDCEtcdClient
GrpcPool kv.GrpcPool
Expand Down
9 changes: 9 additions & 0 deletions cdc/pkg/flags/urls.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ func (us *URLsValue) HostString() string {
return strings.Join(all, ",")
}

// HostList return an list of host:port
func (us *URLsValue) HostList() []string {
all := make([]string, len(*us))
for i, u := range *us {
all[i] = u.Host
}
return all
}

// NewURLsValue return a URLsValue from a string of URLs list
func NewURLsValue(init string) (*URLsValue, error) {
v := &URLsValue{}
Expand Down
2 changes: 2 additions & 0 deletions cdc/pkg/flags/urls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package flags

import (
"strings"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -39,6 +40,7 @@ func TestNewURLsValue(t *testing.T) {
require.Nil(t, err)
hs := urlsValue.HostString()
require.Equal(t, testCase.hostString, hs)
require.Equal(t, strings.Split(testCase.hostString, ","), urlsValue.HostList())
}
}

Expand Down
3 changes: 2 additions & 1 deletion cdc/pkg/util/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -55,7 +56,7 @@ func PutTimezoneInCtx(ctx context.Context, timezone *time.Location) context.Cont
}

// PutKVStorageInCtx returns a new child context with the given tikv store
func PutKVStorageInCtx(ctx context.Context, store kv.Storage) context.Context {
func PutKVStorageInCtx(ctx context.Context, store tikv.Storage) context.Context {
return context.WithValue(ctx, ctxKeyKVStorage, store)
}

Expand Down
11 changes: 9 additions & 2 deletions cdc/pkg/util/ctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/check"
"github.com/pingcap/tidb/store/mockstore"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/migration/cdc/pkg/util/testleak"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -104,8 +105,14 @@ func (s *ctxValueSuite) TestKeySpanInfoNotSet(c *check.C) {

func (s *ctxValueSuite) TestShouldReturnKVStorage(c *check.C) {
defer testleak.AfterTest(c)()
kvStorage, _ := mockstore.NewMockStore()
defer kvStorage.Close()
store, _ := mockstore.NewMockStore()
defer store.Close()

kvStorage, ok := store.(tikv.Storage)
if !ok {
panic("can't create puller for non-tikv storage")
}

ctx := PutKVStorageInCtx(context.Background(), kvStorage)
kvStorage2, err := KVStorageFromCtx(ctx)
c.Assert(kvStorage2, check.DeepEquals, kvStorage)
Expand Down

0 comments on commit 30a7631

Please sign in to comment.