Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graph database implementation for TiDB #16

Open
wants to merge 24 commits into
base: hackathon2021-master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
51a81df
parser: support graph related ddl/query syntax (#5)
lonng Dec 25, 2021
fb8c416
add more unit test for parser
lonng Dec 26, 2021
8d36682
planner/core: support build graph related ast nodes to logical plans
lonng Dec 26, 2021
5360282
parser: support omit destination of edge pattern
lonng Dec 27, 2021
89e36c6
planner/core: refine the plan of graph pattern
lonng Dec 28, 2021
e267cc4
Merge pull request #6 from tigraph/hackathon2021-planner
vodkaslime Dec 31, 2021
a784e57
planner,ddl: fix some panic and refine the planer logical (#8)
lonng Jan 1, 2022
a14f2f5
codec: support write graph records (#9)
Rustin170506 Jan 1, 2022
8bb5a34
graph write supports adaptive primary/source/destination keys (#10)
lonng Jan 1, 2022
acb703b
apply change to depend on tipb from tigraph
vodkaslime Jan 2, 2022
c88fe11
Merge pull request #11 from JeepYiheihou/hackathon2021
vodkaslime Jan 2, 2022
c0ff240
*: implement the graph read path (#12)
lonng Jan 2, 2022
a4cc19e
*: reuse the regular table to store graph vertex data (#13)
lonng Jan 3, 2022
96c8375
executor: finish graph executor (#14)
sleepymole Jan 4, 2022
5e016eb
executor: graph supports prune column optimizer rule (#15)
lonng Jan 5, 2022
e1a0802
planner/core: disable the graph edge scan predicates push down
lonng Jan 6, 2022
ed63a26
parser: fix source/destination key column option restore
lonng Jan 6, 2022
7b4a959
executor: fix show create table result of edge table
lonng Jan 7, 2022
b881fbf
planner: prune destination columns if destination is not specified (#17)
sleepymole Jan 7, 2022
636de72
executor: make BOTH iter bidirectional edge (#18)
sleepymole Jan 7, 2022
bc5aeed
ddl: support alter column add graph option (#19)
sleepymole Jan 8, 2022
14f540b
executor,planner: graph support match any shortest (#20)
sleepymole Jan 9, 2022
eef8768
executor: fix graph shortest path dst id
sleepymole Jan 9, 2022
3fca98d
Merge pull request #21 from gozssky/graph-shortest
sleepymole Jan 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
*: implement the graph read path (#12)
  • Loading branch information
lonng authored Jan 2, 2022
commit c0ff24084fe5e18278ba420a4d47b00c1f35b4ab
2 changes: 1 addition & 1 deletion br/pkg/backup/client.go
Original file line number Diff line number Diff line change
@@ -244,7 +244,7 @@ func appendRanges(tbl *model.TableInfo, tblID int64) ([]kv.KeyRange, error) {
ranges = ranger.FullIntRange(false)
}

kvRanges, err := distsql.TableHandleRangesToKVRanges(nil, []int64{tblID}, tbl.IsCommonHandle, ranges, nil)
kvRanges, err := distsql.TableHandleRangesToKVRanges(nil, []int64{tblID}, tbl.IsCommonHandle, tbl.Type, ranges, nil)
if err != nil {
return nil, errors.Trace(err)
}
2 changes: 1 addition & 1 deletion br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
@@ -181,7 +181,7 @@ func buildTableRequest(
var builder distsql.RequestBuilder
// Use low priority to reducing impact to other requests.
builder.Request.Priority = kv.PriorityLow
return builder.SetHandleRanges(nil, tableID, tableInfo.IsCommonHandle, ranges, nil).
return builder.SetHandleRanges(nil, tableID, tableInfo.IsCommonHandle, tableInfo.Type, ranges, nil).
SetStartTS(startTS).
SetChecksumRequest(checksum).
SetConcurrency(int(concurrency)).
15 changes: 8 additions & 7 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
@@ -464,16 +464,17 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(
logger := log.With(zap.String("table", tableName))

allRanges := make([]tidbkv.KeyRange, 0)
tableIDs := physicalTableIDs(tbl.Meta())
tableInfo := tbl.Meta()
tableIDs := physicalTableIDs(tableInfo)
// Collect row handle duplicates.
var dataConflictInfos []errormanager.DataConflictInfo
hasDataConflict := false
{
ranges := ranger.FullIntRange(false)
if tbl.Meta().IsCommonHandle {
if tableInfo.IsCommonHandle {
ranges = ranger.FullRange()
}
keyRanges, err := distsql.TableHandleRangesToKVRanges(nil, tableIDs, tbl.Meta().IsCommonHandle, ranges, nil)
keyRanges, err := distsql.TableHandleRangesToKVRanges(nil, tableIDs, tableInfo.IsCommonHandle, tableInfo.Type, ranges, nil)
if err != nil {
return false, errors.Trace(err)
}
@@ -534,7 +535,7 @@ func (manager *DuplicateManager) CollectDuplicateRowsFromLocalIndex(
}
}
handles := makePendingIndexHandlesWithCapacity(0)
for _, indexInfo := range tbl.Meta().Indices {
for _, indexInfo := range tableInfo.Indices {
if indexInfo.State != model.StatePublic {
continue
}
@@ -763,7 +764,7 @@ func (manager *DuplicateManager) makeConn(ctx context.Context, storeID uint64) (
func buildDuplicateRequests(tableInfo *model.TableInfo) ([]*DuplicateRequest, error) {
var reqs []*DuplicateRequest
for _, id := range physicalTableIDs(tableInfo) {
tableReqs, err := buildTableRequests(id, tableInfo.IsCommonHandle)
tableReqs, err := buildTableRequests(id, tableInfo.IsCommonHandle, tableInfo.Type)
if err != nil {
return nil, errors.Trace(err)
}
@@ -782,12 +783,12 @@ func buildDuplicateRequests(tableInfo *model.TableInfo) ([]*DuplicateRequest, er
return reqs, nil
}

func buildTableRequests(tableID int64, isCommonHandle bool) ([]*DuplicateRequest, error) {
func buildTableRequests(tableID int64, isCommonHandle bool, tblType model.TableType) ([]*DuplicateRequest, error) {
ranges := ranger.FullIntRange(false)
if isCommonHandle {
ranges = ranger.FullRange()
}
keysRanges, err := distsql.TableHandleRangesToKVRanges(nil, []int64{tableID}, isCommonHandle, ranges, nil)
keysRanges, err := distsql.TableHandleRangesToKVRanges(nil, []int64{tableID}, isCommonHandle, tblType, ranges, nil)
if err != nil {
return nil, errors.Trace(err)
}
5 changes: 3 additions & 2 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
@@ -448,12 +448,13 @@ func (dc *ddlCtx) buildDescTableScan(ctx context.Context, startTS uint64, tbl ta
var b distsql.RequestBuilder
var builder *distsql.RequestBuilder
var ranges []*ranger.Range
if tbl.Meta().IsCommonHandle {
tableInfo := tbl.Meta()
if tableInfo.IsCommonHandle {
ranges = ranger.FullNotNullRange()
} else {
ranges = ranger.FullIntRange(false)
}
builder = b.SetHandleRanges(sctx.GetSessionVars().StmtCtx, tbl.GetPhysicalID(), tbl.Meta().IsCommonHandle, ranges, nil)
builder = b.SetHandleRanges(sctx.GetSessionVars().StmtCtx, tbl.GetPhysicalID(), tableInfo.IsCommonHandle, tableInfo.Type, ranges, nil)
builder.SetDAGRequest(dagPB).
SetStartTS(startTS).
SetKeepOrder(true).
49 changes: 28 additions & 21 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ package distsql

import (
"fmt"
"github.com/pingcap/tidb/parser/model"
"math"
"sort"
"sync/atomic"
@@ -109,23 +110,23 @@ func (builder *RequestBuilder) SetIndexRangesForTables(sc *stmtctx.StatementCont

// SetHandleRanges sets "KeyRanges" for "kv.Request" by converting table handle range
// "ranges" to "KeyRanges" firstly.
func (builder *RequestBuilder) SetHandleRanges(sc *stmtctx.StatementContext, tid int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder {
return builder.SetHandleRangesForTables(sc, []int64{tid}, isCommonHandle, ranges, fb)
func (builder *RequestBuilder) SetHandleRanges(sc *stmtctx.StatementContext, tid int64, isCommonHandle bool, tblType model.TableType, ranges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder {
return builder.SetHandleRangesForTables(sc, []int64{tid}, isCommonHandle, tblType, ranges, fb)
}

// SetHandleRangesForTables sets "KeyRanges" for "kv.Request" by converting table handle range
// "ranges" to "KeyRanges" firstly for multiple tables.
func (builder *RequestBuilder) SetHandleRangesForTables(sc *stmtctx.StatementContext, tid []int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder {
func (builder *RequestBuilder) SetHandleRangesForTables(sc *stmtctx.StatementContext, tid []int64, isCommonHandle bool, tblType model.TableType, ranges []*ranger.Range, fb *statistics.QueryFeedback) *RequestBuilder {
if builder.err == nil {
builder.Request.KeyRanges, builder.err = TableHandleRangesToKVRanges(sc, tid, isCommonHandle, ranges, fb)
builder.Request.KeyRanges, builder.err = TableHandleRangesToKVRanges(sc, tid, isCommonHandle, tblType, ranges, fb)
}
return builder
}

// SetTableHandles sets "KeyRanges" for "kv.Request" by converting table handles
// "handles" to "KeyRanges" firstly.
func (builder *RequestBuilder) SetTableHandles(tid int64, handles []kv.Handle) *RequestBuilder {
builder.Request.KeyRanges = TableHandlesToKVRanges(tid, handles)
func (builder *RequestBuilder) SetTableHandles(tid int64, tblType model.TableType, handles []kv.Handle) *RequestBuilder {
builder.Request.KeyRanges = TableHandlesToKVRanges(tid, tblType, handles)
return builder
}

@@ -351,9 +352,9 @@ func (builder *RequestBuilder) SetIsStaleness(is bool) *RequestBuilder {
}

// TableHandleRangesToKVRanges convert table handle ranges to "KeyRanges" for multiple tables.
func TableHandleRangesToKVRanges(sc *stmtctx.StatementContext, tid []int64, isCommonHandle bool, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) {
func TableHandleRangesToKVRanges(sc *stmtctx.StatementContext, tid []int64, isCommonHandle bool, tblType model.TableType, ranges []*ranger.Range, fb *statistics.QueryFeedback) ([]kv.KeyRange, error) {
if !isCommonHandle {
return tablesRangesToKVRanges(tid, ranges, fb), nil
return tablesRangesToKVRanges(tid, tblType, ranges, fb), nil
}
return CommonHandleRangesToKVRanges(sc, tid, ranges)
}
@@ -362,13 +363,13 @@ func TableHandleRangesToKVRanges(sc *stmtctx.StatementContext, tid []int64, isCo
// Note this function should not be exported, but currently
// br refers to it, so have to keep it.
func TableRangesToKVRanges(tid int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) []kv.KeyRange {
return tablesRangesToKVRanges([]int64{tid}, ranges, fb)
return tablesRangesToKVRanges([]int64{tid}, model.TableTypeIsRegular, ranges, fb)
}

// tablesRangesToKVRanges converts table ranges to "KeyRange".
func tablesRangesToKVRanges(tids []int64, ranges []*ranger.Range, fb *statistics.QueryFeedback) []kv.KeyRange {
func tablesRangesToKVRanges(tids []int64, tblType model.TableType, ranges []*ranger.Range, fb *statistics.QueryFeedback) []kv.KeyRange {
if fb == nil || fb.Hist == nil {
return tableRangesToKVRangesWithoutSplit(tids, ranges)
return tableRangesToKVRangesWithoutSplit(tids, tblType, ranges)
}
krs := make([]kv.KeyRange, 0, len(ranges))
feedbackRanges := make([]*ranger.Range, 0, len(ranges))
@@ -389,22 +390,22 @@ func tablesRangesToKVRanges(tids []int64, ranges []*ranger.Range, fb *statistics
high = kv.Key(high).PrefixNext()
}
for _, tid := range tids {
startKey := tablecodec.EncodeRowKey(tid, low)
endKey := tablecodec.EncodeRowKey(tid, high)
startKey := tablecodec.EncodeRowKeyByType(tid, tblType, low)
endKey := tablecodec.EncodeRowKeyByType(tid, tblType, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
}
}
fb.StoreRanges(feedbackRanges)
return krs
}

func tableRangesToKVRangesWithoutSplit(tids []int64, ranges []*ranger.Range) []kv.KeyRange {
func tableRangesToKVRangesWithoutSplit(tids []int64, tblType model.TableType, ranges []*ranger.Range) []kv.KeyRange {
krs := make([]kv.KeyRange, 0, len(ranges)*len(tids))
for _, ran := range ranges {
low, high := encodeHandleKey(ran)
for _, tid := range tids {
startKey := tablecodec.EncodeRowKey(tid, low)
endKey := tablecodec.EncodeRowKey(tid, high)
startKey := tablecodec.EncodeRowKeyByType(tid, tblType, low)
endKey := tablecodec.EncodeRowKeyByType(tid, tblType, high)
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
}
}
@@ -488,9 +489,10 @@ func SplitRangesAcrossInt64Boundary(ranges []*ranger.Range, keepOrder bool, desc

// TableHandlesToKVRanges converts sorted handle to kv ranges.
// For continuous handles, we should merge them to a single key range.
func TableHandlesToKVRanges(tid int64, handles []kv.Handle) []kv.KeyRange {
func TableHandlesToKVRanges(tid int64, tblType model.TableType, handles []kv.Handle) []kv.KeyRange {
krs := make([]kv.KeyRange, 0, len(handles))
i := 0
var startKey, endKey kv.Key
for i < len(handles) {
if commonHandle, ok := handles[i].(*kv.CommonHandle); ok {
ran := kv.KeyRange{
@@ -508,10 +510,15 @@ func TableHandlesToKVRanges(tid int64, handles []kv.Handle) []kv.KeyRange {
}
}
low := codec.EncodeInt(nil, handles[i].IntValue())
high := codec.EncodeInt(nil, handles[j-1].IntValue())
high = kv.Key(high).PrefixNext()
startKey := tablecodec.EncodeRowKey(tid, low)
endKey := tablecodec.EncodeRowKey(tid, high)
if tblType == model.TableTypeIsRegular {
high := codec.EncodeInt(nil, handles[j-1].IntValue())
high = kv.Key(high).PrefixNext()
startKey = tablecodec.EncodeRowKey(tid, low)
endKey = tablecodec.EncodeRowKey(tid, high)
} else {
startKey = tablecodec.EncodeRowKeyByType(tid, tblType, low)
endKey = startKey.PrefixNext()
}
krs = append(krs, kv.KeyRange{StartKey: startKey, EndKey: endKey})
i = j
}
7 changes: 4 additions & 3 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
package distsql

import (
"github.com/pingcap/tidb/parser/model"
"testing"

"github.com/pingcap/tidb/kv"
@@ -59,7 +60,7 @@ func TestTableHandlesToKVRanges(t *testing.T) {

// Build key ranges.
expect := getExpectedRanges(1, hrs)
actual := TableHandlesToKVRanges(1, handles)
actual := TableHandlesToKVRanges(1, model.TableTypeIsRegular, handles)

// Compare key ranges and expected key ranges.
require.Equal(t, len(expect), len(actual))
@@ -214,7 +215,7 @@ func TestRequestBuilder1(t *testing.T) {
},
}

actual, err := (&RequestBuilder{}).SetHandleRanges(nil, 12, false, ranges, nil).
actual, err := (&RequestBuilder{}).SetHandleRanges(nil, 12, false, model.TableTypeIsRegular, ranges, nil).
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
@@ -343,7 +344,7 @@ func TestRequestBuilder3(t *testing.T) {
handles := []kv.Handle{kv.IntHandle(0), kv.IntHandle(2), kv.IntHandle(3), kv.IntHandle(4),
kv.IntHandle(5), kv.IntHandle(10), kv.IntHandle(11), kv.IntHandle(100)}

actual, err := (&RequestBuilder{}).SetTableHandles(15, handles).
actual, err := (&RequestBuilder{}).SetTableHandles(15, model.TableTypeIsRegular, handles).
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
8 changes: 6 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
@@ -374,6 +374,7 @@ type AnalyzeIndexExec struct {

idxInfo *model.IndexInfo
isCommonHandle bool
tableType model.TableType
result distsql.SelectResult
countNullRes distsql.SelectResult
}
@@ -385,7 +386,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang
var builder distsql.RequestBuilder
var kvReqBuilder *distsql.RequestBuilder
if e.isCommonHandle && e.idxInfo.Primary {
kvReqBuilder = builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, true, ranges, nil)
kvReqBuilder = builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, true, e.tableType, ranges, nil)
} else {
kvReqBuilder = builder.SetIndexRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.tableID.GetStatisticsID()}, e.idxInfo.ID, ranges)
}
@@ -749,7 +750,9 @@ func (e *AnalyzeColumnsExec) open(ranges []*ranger.Range) error {

func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectResult, error) {
var builder distsql.RequestBuilder
reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx, []int64{e.TableID.GetStatisticsID()}, e.handleCols != nil && !e.handleCols.IsInt(), ranges, nil)
reqBuilder := builder.SetHandleRangesForTables(e.ctx.GetSessionVars().StmtCtx,
[]int64{e.TableID.GetStatisticsID()}, e.handleCols != nil && !e.handleCols.IsInt(),
e.tableInfo.Type, ranges, nil)
builder.SetResourceGroupTagger(e.ctx.GetSessionVars().StmtCtx)
// Always set KeepOrder of the request to be true, in order to compute
// correct `correlation` of columns.
@@ -1119,6 +1122,7 @@ func (e *AnalyzeColumnsExec) buildSubIndexJobForSpecialIndex(indexInfos []*model
idxExec := &AnalyzeIndexExec{
baseAnalyzeExec: base,
isCommonHandle: e.tableInfo.IsCommonHandle,
tableType: e.tableInfo.Type,
idxInfo: indexInfo,
}
idxExec.opts = make(map[ast.AnalyzeOptionType]uint64, len(ast.AnalyzeOptionString))
33 changes: 23 additions & 10 deletions executor/builder.go
Original file line number Diff line number Diff line change
@@ -2150,6 +2150,7 @@ func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeInde
e := &AnalyzeIndexExec{
baseAnalyzeExec: base,
isCommonHandle: task.TblInfo.IsCommonHandle,
tableType: task.TblInfo.Type,
idxInfo: task.IndexInfo,
}
topNSize := new(int32)
@@ -3779,10 +3780,10 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte

handles, lookUpContents := dedupHandles(lookUpContents)
if tbInfo.GetPartitionInfo() == nil {
return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles)
return builder.buildTableReaderFromHandles(ctx, e, tbInfo.Type, handles, canReorderHandles)
}
if !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles)
return builder.buildTableReaderFromHandles(ctx, e, tbInfo.Type, handles, canReorderHandles)
}

tbl, _ := builder.is.TableByID(tbInfo.ID)
@@ -3807,7 +3808,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
}
pid := p.GetPhysicalID()
handle := kv.IntHandle(content.keys[0].GetInt64())
tmp := distsql.TableHandlesToKVRanges(pid, []kv.Handle{handle})
tmp := distsql.TableHandlesToKVRanges(pid, tbInfo.Type, []kv.Handle{handle})
kvRanges = append(kvRanges, tmp...)
}
} else {
@@ -3818,7 +3819,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
}
for _, p := range partitions {
pid := p.GetPhysicalID()
tmp := distsql.TableHandlesToKVRanges(pid, handles)
tmp := distsql.TableHandlesToKVRanges(pid, tbInfo.Type, handles)
kvRanges = append(kvRanges, tmp...)
}
}
@@ -3855,8 +3856,14 @@ func (h kvRangeBuilderFromRangeAndPartition) buildKeyRangeSeparately(ranges []*r
var pids []int64
for _, p := range h.partitions {
pid := p.GetPhysicalID()
meta := p.Meta()
kvRange, err := distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, meta != nil && meta.IsCommonHandle, ranges, nil)
tableInfo := p.Meta()
var kvRange []kv.KeyRange
var err error
if tableInfo == nil {
kvRange, err = distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, false, model.TableTypeIsRegular, ranges, nil)
} else {
kvRange, err = distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, tableInfo.IsCommonHandle, tableInfo.Type, ranges, nil)
}
if err != nil {
return nil, nil, err
}
@@ -3870,8 +3877,14 @@ func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(_ int64, ranges []*ra
var ret []kv.KeyRange
for _, p := range h.partitions {
pid := p.GetPhysicalID()
meta := p.Meta()
kvRange, err := distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, meta != nil && meta.IsCommonHandle, ranges, nil)
tableInfo := p.Meta()
var kvRange []kv.KeyRange
var err error
if tableInfo == nil {
kvRange, err = distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, false, model.TableTypeIsRegular, ranges, nil)
} else {
kvRange, err = distsql.TableHandleRangesToKVRanges(h.sctx.GetSessionVars().StmtCtx, []int64{pid}, tableInfo.IsCommonHandle, tableInfo.Type, ranges, nil)
}
if err != nil {
return nil, err
}
@@ -3909,7 +3922,7 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T
return e, nil
}

func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Context, e *TableReaderExecutor, handles []kv.Handle, canReorderHandles bool) (*TableReaderExecutor, error) {
func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Context, e *TableReaderExecutor, tblType model.TableType, handles []kv.Handle, canReorderHandles bool) (*TableReaderExecutor, error) {
if canReorderHandles {
sort.Slice(handles, func(i, j int) bool {
return handles[i].Compare(handles[j]) < 0
@@ -3920,7 +3933,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
if _, ok := handles[0].(kv.PartitionHandle); ok {
b.SetPartitionsAndHandles(handles)
} else {
b.SetTableHandles(getPhysicalTableID(e.table), handles)
b.SetTableHandles(getPhysicalTableID(e.table), tblType, handles)
}
}
return builder.buildTableReaderBase(ctx, e, b)
2 changes: 1 addition & 1 deletion executor/checksum.go
Original file line number Diff line number Diff line change
@@ -242,7 +242,7 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6

var builder distsql.RequestBuilder
builder.SetResourceGroupTagger(ctx.GetSessionVars().StmtCtx)
return builder.SetHandleRanges(ctx.GetSessionVars().StmtCtx, tableID, c.TableInfo.IsCommonHandle, ranges, nil).
return builder.SetHandleRanges(ctx.GetSessionVars().StmtCtx, tableID, c.TableInfo.IsCommonHandle, c.TableInfo.Type, ranges, nil).
SetChecksumRequest(checksum).
SetStartTS(c.StartTs).
SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency()).
2 changes: 1 addition & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
@@ -665,7 +665,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup
extraPIDColumnIndex: e.extraPIDColumnIndex,
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true)
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, table.Meta().Type, task.handles, true)
if err != nil {
logutil.Logger(ctx).Error("build table reader from handles failed", zap.Error(err))
return nil, err
Loading