Skip to content

Commit

Permalink
[to #323] *: Remove tidb dependency (#367)
Browse files Browse the repository at this point in the history
* remove tidb dependency

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* remove more tidb

Signed-off-by: Ping Yu <[email protected]>

---------

Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu authored Nov 20, 2023
1 parent 0837ce5 commit 999ca43
Show file tree
Hide file tree
Showing 24 changed files with 92 additions and 690 deletions.
5 changes: 2 additions & 3 deletions cdc/cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
Expand Down Expand Up @@ -61,7 +60,7 @@ type Capture struct {
election *concurrency.Election

pdClient pd.Client
kvStorage tidbkv.Storage
kvStorage tikv.Storage
createEtcdClient createEtcdClientFunc
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
Expand All @@ -75,7 +74,7 @@ type Capture struct {
}

// NewCapture returns a new Capture instance
func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, createEtcdClient createEtcdClientFunc) *Capture {
func NewCapture(pdClient pd.Client, kvStorage tikv.Storage, createEtcdClient createEtcdClientFunc) *Capture {
return &Capture{
pdClient: pdClient,
kvStorage: kvStorage,
Expand Down
4 changes: 2 additions & 2 deletions cdc/cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (

"github.com/gin-gonic/gin"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/cdc/owner"
"github.com/tikv/migration/cdc/pkg/config"
cerror "github.com/tikv/migration/cdc/pkg/errors"
"github.com/tikv/migration/cdc/pkg/httputil"
"github.com/tikv/migration/cdc/pkg/logutil"
"github.com/tikv/migration/cdc/pkg/retry"
"github.com/tikv/migration/cdc/pkg/version"
Expand Down Expand Up @@ -782,7 +782,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {
}

// forward to owner
cli := httputil.NewClient(tslConfig)
cli := httputil.NewClientByTLSConfig(tslConfig)
resp, err := cli.Do(req)
if err != nil {
_ = c.Error(err)
Expand Down
6 changes: 3 additions & 3 deletions cdc/cdc/http_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (

"github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/tikv/migration/cdc/cdc/capture"
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/pkg/config"
cerrors "github.com/tikv/migration/cdc/pkg/errors"
"github.com/tikv/migration/cdc/pkg/httputil"
"github.com/tikv/migration/cdc/pkg/retry"
security2 "github.com/tikv/migration/cdc/pkg/security"
"github.com/tikv/migration/cdc/pkg/util/testleak"
Expand Down Expand Up @@ -246,7 +246,7 @@ func (s *httpStatusSuite) TestServerTLSWithoutCommonName(c *check.C) {
if err != nil {
c.Assert(err, check.IsNil)
}
cli := httputil.NewClient(tlsConfig)
cli := httputil.NewClientByTLSConfig(tlsConfig)
resp, err := cli.Get(statusURL)
if err != nil {
return err
Expand Down Expand Up @@ -324,7 +324,7 @@ func (s *httpStatusSuite) TestServerTLSWithCommonName(c *check.C) {
if err != nil {
c.Assert(err, check.IsNil)
}
cli := httputil.NewClient(tlsConfig)
cli := httputil.NewClientByTLSConfig(tlsConfig)
resp, err := cli.Get(statusURL)
if err != nil {
return err
Expand Down
6 changes: 2 additions & 4 deletions cdc/cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mockcopr"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -399,9 +398,8 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
cluster.ChangeLeader(3, 5)

ts, err := kvStorage.CurrentTimestamp(oracle.GlobalTxnScope)
ver := kv.NewVersion(ts)
c.Assert(err, check.IsNil)
ch2 <- makeEvent(ver.Ver)
ch2 <- makeEvent(ts)
var event model.RegionFeedEvent
// consume the first resolved ts event, which is sent before region starts
<-eventCh
Expand All @@ -417,7 +415,7 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
case <-time.After(time.Second):
c.Fatalf("reconnection not succeed in 1 second")
}
checkEvent(event, GetSafeResolvedTs(ver.Ver))
checkEvent(event, GetSafeResolvedTs(ts))

// check gRPC connection active counter is updated correctly
bucket, ok := grpcPool.bucketConns[invalidStore]
Expand Down
17 changes: 9 additions & 8 deletions cdc/cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
log.Warn("failed to get current version from PD", zap.Error(err))
continue
}
currentTimeFromPD := oracle.GetTimeFromTS(version.Ver)
currentTimeFromPD := oracle.GetTimeFromTS(version)
expired := make([]*regionTsInfo, 0)
for w.rtsManager.Len() > 0 {
item := w.rtsManager.Pop()
Expand All @@ -330,7 +330,7 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
if len(expired) == 0 {
continue
}
maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0)
// maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0)
for _, rts := range expired {
state, ok := w.getRegionState(rts.regionID)
if !ok || state.isStopped() {
Expand Down Expand Up @@ -359,18 +359,19 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
w.rtsManager.Upsert(rts)
continue
}
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock",
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time",
zap.Uint64("regionID", rts.regionID),
zap.Stringer("span", state.getRegionSpan()),
zap.Duration("duration", sinceLastResolvedTs),
zap.Duration("lastEvent", sinceLastEvent),
zap.Uint64("resolvedTs", lastResolvedTs),
)
err = w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion)
if err != nil {
log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), zap.Error(err))
continue
}
// Resolve locks for RawKV is not necessary. Add it back after we support TxnKV.
// err = w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion)
// if err != nil {
// log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), zap.Error(err))
// continue
// }
rts.ts.penalty = 0
}
rts.ts.resolvedTs = lastResolvedTs
Expand Down
37 changes: 10 additions & 27 deletions cdc/cdc/kv/store_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,15 @@
package kv

import (
"fmt"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbconfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/driver"
tikvconfig "github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/migration/cdc/cdc/model"
cerror "github.com/tikv/migration/cdc/pkg/errors"
"github.com/tikv/migration/cdc/pkg/flags"
Expand All @@ -38,7 +33,7 @@ import (
// TiKVStorage is the tikv storage interface used by CDC.
type TiKVStorage interface {
tikv.Storage
GetCachedCurrentVersion() (version tidbkv.Version, err error)
GetCachedCurrentVersion() (version uint64, err error)
}

const (
Expand Down Expand Up @@ -80,7 +75,7 @@ func newStorageWithCurVersionCache(storage tikv.Storage, cacheKey string) TiKVSt
}

// GetCachedCurrentVersion gets the cached version of currentVersion, and update the cache if necessary
func (s *StorageWithCurVersionCache) GetCachedCurrentVersion() (version tidbkv.Version, err error) {
func (s *StorageWithCurVersionCache) GetCachedCurrentVersion() (version uint64, err error) {
curVersionCacheMu.Lock()
entry, exists := curVersionCache[s.cacheKey]
curVersionCacheMu.Unlock()
Expand All @@ -99,42 +94,30 @@ func (s *StorageWithCurVersionCache) GetCachedCurrentVersion() (version tidbkv.V
if err != nil {
return
}
ver := kv.NewVersion(ts)
entry.ts = ver.Ver
entry.ts = ts
entry.lastUpdated = time.Now()
}

version.Ver = entry.ts
version = entry.ts
return
}

// GetSnapshotMeta returns tidb meta information
// TODO: Simplify the signature of this function
func GetSnapshotMeta(tiStore tidbkv.Storage, ts uint64) (*meta.Meta, error) {
snapshot := tiStore.GetSnapshot(tidbkv.NewVersion(ts))
return meta.NewSnapshotMeta(snapshot), nil
}

// CreateTiStore creates a new tikv storage client
func CreateTiStore(urls string, credential *security.Credential) (kv.Storage, error) {
func CreateTiStore(urls string, credential *security.Credential) (tikv.Storage, error) {
urlv, err := flags.NewURLsValue(urls)
if err != nil {
return nil, errors.Trace(err)
}

// Ignore error if it is already registered.
_ = store.Register("tikv", driver.TiKVDriver{})

if credential.CAPath != "" {
conf := tidbconfig.GetGlobalConfig()
conf := tikvconfig.GetGlobalConfig()
conf.Security.ClusterSSLCA = credential.CAPath
conf.Security.ClusterSSLCert = credential.CertPath
conf.Security.ClusterSSLKey = credential.KeyPath
tidbconfig.StoreGlobalConfig(conf)
tikvconfig.StoreGlobalConfig(conf)
}

tiPath := fmt.Sprintf("tikv://%s?disableGC=true", urlv.HostString())
tiStore, err := store.New(tiPath)
tiStore, err := txnkv.NewClient(urlv.HostList())
if err != nil {
return nil, cerror.WrapError(cerror.ErrNewStore, err)
}
Expand Down
Loading

0 comments on commit 999ca43

Please sign in to comment.