Skip to content

Commit

Permalink
fix: flush fixed
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Aug 10, 2024
1 parent 2e57511 commit 45950c5
Show file tree
Hide file tree
Showing 31 changed files with 750 additions and 202 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ generate-mockery-chunk-manager: getdeps
generate-mockery-pkg:
$(MAKE) -C pkg generate-mockery

generate-mockery-internal:
generate-mockery-internal: getdeps
$(INSTALL_PATH)/mockery --config $(PWD)/internal/.mockery.yaml

generate-mockery: generate-mockery-types generate-mockery-kv generate-mockery-rootcoord generate-mockery-proxy generate-mockery-querycoord generate-mockery-querynode generate-mockery-datacoord generate-mockery-pkg generate-mockery-internal
Expand Down
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ streamingNode:
log:
# Milvus log level. Option: debug, info, warn, error, panic, and fatal.
# It is recommended to use debug level under test and development environments, and info level in production environment.
level: info
level: debug
file:
# Root path to the log files.
# The default value is set empty, indicating to output log files to standard output (stdout) and standard error (stderr).
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/gin-gonic/gin v1.9.1
github.com/go-playground/validator/v10 v10.14.0
github.com/gofrs/flock v0.8.1
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/protobuf v1.5.4
github.com/google/btree v1.1.2
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
Expand Down Expand Up @@ -57,6 +57,7 @@ require (

require (
github.com/bits-and-blooms/bitset v1.10.0
github.com/bytedance/sonic v1.9.1
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cockroachdb/redact v1.1.3
github.com/greatroar/blobloom v0.0.0-00010101000000-000000000000
Expand Down Expand Up @@ -91,7 +92,6 @@ require (
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/benesch/cgosymbolizer v0.0.0-20190515212042-bec6fe6e597b // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.9.1 // indirect
github.com/campoy/embedmd v1.0.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
Expand Down Expand Up @@ -248,6 +248,7 @@ replace (
github.com/go-kit/kit => github.com/go-kit/kit v0.1.0
github.com/greatroar/blobloom => github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93
github.com/ianlancetaylor/cgosymbolizer => github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119
github.com/milvus-io/milvus-proto/go-api/v2 => github.com/chyezh/milvus-proto/go-api/v2 v2.0.0-20240809084827-12ef847f21c6
github.com/milvus-io/milvus/pkg => ./pkg
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
github.com/chyezh/milvus-proto/go-api/v2 v2.0.0-20240809084827-12ef847f21c6 h1:oJLDOUWj7GE+fzmSNbLYM4GSDmIqqRFhR/RTx0JOFXg=
github.com/chyezh/milvus-proto/go-api/v2 v2.0.0-20240809084827-12ef847f21c6/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand Down Expand Up @@ -598,8 +600,6 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240717062137-3ffb1db01632 h1:CXig0DNtUsCLzchCFe3PR2KgOdobbz9gK2nSV7195PM=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240717062137-3ffb1db01632/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
github.com/milvus-io/pulsar-client-go v0.6.10/go.mod h1:lQqCkgwDF8YFYjKA+zOheTk1tev2B+bKj5j7+nm8M1w=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
9 changes: 5 additions & 4 deletions internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ packages:
github.com/milvus-io/milvus/internal/distributed/streaming:
interfaces:
WALAccesser:
Utility:
github.com/milvus-io/milvus/internal/streamingcoord/server/balancer:
interfaces:
Balancer:
Expand Down Expand Up @@ -43,11 +44,11 @@ packages:
Interceptor:
InterceptorWithReady:
InterceptorBuilder:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector:
interfaces:
? github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector
: interfaces:
SealOperator:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector:
interfaces:
? github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector
: interfaces:
TimeTickSyncOperator:
google.golang.org/grpc:
interfaces:
Expand Down
65 changes: 35 additions & 30 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
Expand Down Expand Up @@ -111,14 +112,16 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
}
timeOfSeal, _ := tsoutil.ParseTS(ts)

sealedSegmentIDs, err := s.segmentManager.SealAllSegments(ctx, req.GetCollectionID(), req.GetSegmentIDs())
if err != nil {
return &datapb.FlushResponse{
Status: merr.Status(errors.Wrapf(err, "failed to flush collection %d",
req.GetCollectionID())),
}, nil
sealedSegmentIDs := make([]int64, 0)
if !streamingutil.IsStreamingServiceEnabled() {
var err error
if sealedSegmentIDs, err = s.segmentManager.SealAllSegments(ctx, req.GetCollectionID(), req.GetSegmentIDs()); err != nil {
return &datapb.FlushResponse{
Status: merr.Status(errors.Wrapf(err, "failed to flush collection %d",
req.GetCollectionID())),
}, nil
}
}

sealedSegmentsIDDict := make(map[UniqueID]bool)
for _, sealedSegmentID := range sealedSegmentIDs {
sealedSegmentsIDDict[sealedSegmentID] = true
Expand All @@ -135,33 +138,35 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
}
}

var isUnimplemented bool
err = retry.Do(ctx, func() error {
nodeChannels := s.channelManager.GetNodeChannelsByCollectionID(req.GetCollectionID())
if !streamingutil.IsStreamingServiceEnabled() {
var isUnimplemented bool
err = retry.Do(ctx, func() error {
nodeChannels := s.channelManager.GetNodeChannelsByCollectionID(req.GetCollectionID())

for nodeID, channelNames := range nodeChannels {
err = s.cluster.FlushChannels(ctx, nodeID, ts, channelNames)
if err != nil && errors.Is(err, merr.ErrServiceUnimplemented) {
isUnimplemented = true
return nil
}
if err != nil {
return err
for nodeID, channelNames := range nodeChannels {
err = s.cluster.FlushChannels(ctx, nodeID, ts, channelNames)
if err != nil && errors.Is(err, merr.ErrServiceUnimplemented) {
isUnimplemented = true
return nil
}
if err != nil {
return err
}
}
return nil
}, retry.Attempts(60)) // about 3min
if err != nil {
return &datapb.FlushResponse{
Status: merr.Status(err),
}, nil
}
return nil
}, retry.Attempts(60)) // about 3min
if err != nil {
return &datapb.FlushResponse{
Status: merr.Status(err),
}, nil
}

if isUnimplemented {
// For compatible with rolling upgrade from version 2.2.x,
// fall back to the flush logic of version 2.2.x;
log.Warn("DataNode FlushChannels unimplemented", zap.Error(err))
ts = 0
if isUnimplemented {
// For compatible with rolling upgrade from version 2.2.x,
// fall back to the flush logic of version 2.2.x;
log.Warn("DataNode FlushChannels unimplemented", zap.Error(err))
ts = 0
}
}

log.Info("flush response with segments",
Expand Down
3 changes: 2 additions & 1 deletion internal/flushcommon/pipeline/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
Expand Down Expand Up @@ -158,7 +159,7 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i
return nil, err
}
segmentPks.Insert(segment.GetID(), stats)
if tickler != nil {
if !streamingutil.IsStreamingServiceEnabled() {
tickler.Inc()
}

Expand Down
37 changes: 32 additions & 5 deletions internal/flushcommon/pipeline/flow_graph_dd_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"reflect"
"sync/atomic"

"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"

"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
Expand All @@ -33,6 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/streamingnode/server/flusher"
"github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
Expand Down Expand Up @@ -154,7 +157,9 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
ddn.dropMode.Store(true)

log.Info("Stop compaction for dropped channel", zap.String("channel", ddn.vChannelName))
ddn.compactionExecutor.DiscardByDroppedChannel(ddn.vChannelName)
if !streamingutil.IsStreamingServiceEnabled() {
ddn.compactionExecutor.DiscardByDroppedChannel(ddn.vChannelName)
}
fgMsg.dropCollection = true
}

Expand Down Expand Up @@ -232,10 +237,32 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).
Add(float64(dmsg.GetNumRows()))
fgMsg.DeleteMessages = append(fgMsg.DeleteMessages, dmsg)

case commonpb.MsgType_Flush:
if ddn.flushMsgHandler != nil {
ddn.flushMsgHandler(ddn.vChannelName, nil)
case commonpb.MsgType_FlushSegment:
flushMsg := msg.(*adaptor.FlushMessageBody)
logger := log.With(
zap.String("vchannel", ddn.Name()),
zap.Int32("msgType", int32(msg.Type())),
zap.Uint64("timetick", flushMsg.FlushMessage.TimeTick()),
)
logger.Info("receive flush message")
if err := ddn.flushMsgHandler.HandleFlush(ddn.vChannelName, flushMsg.FlushMessage); err != nil {
logger.Warn("handle flush message failed", zap.Error(err))
} else {
logger.Info("handle flush message success")
}
case commonpb.MsgType_ManualFlush:
manualFlushMsg := msg.(*adaptor.ManualFlushMessageBody)
logger := log.With(
zap.String("vchannel", ddn.Name()),
zap.Int32("msgType", int32(msg.Type())),
zap.Uint64("timetick", manualFlushMsg.ManualFlushMessage.TimeTick()),
zap.Uint64("flushTs", manualFlushMsg.ManualFlushMessage.Header().FlushTs),
)
logger.Info("receive manual flush message")
if err := ddn.flushMsgHandler.HandleManualFlush(ddn.vChannelName, manualFlushMsg.ManualFlushMessage); err != nil {
logger.Warn("handle manual flush message failed", zap.Error(err))
} else {
logger.Info("handle manual flush message success")
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion internal/flushcommon/pipeline/flow_graph_write_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/internal/flushcommon/writebuffer"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -98,7 +99,9 @@ func (wNode *writeNode) Operate(in []Msg) []Msg {
}, true
})

wNode.updater.Update(wNode.channelName, end.GetTimestamp(), stats)
if !streamingutil.IsStreamingServiceEnabled() {
wNode.updater.Update(wNode.channelName, end.GetTimestamp(), stats)
}

res := FlowGraphMsg{
TimeRange: fgMsg.TimeRange,
Expand Down
96 changes: 96 additions & 0 deletions internal/mocks/distributed/mock_streaming/mock_Utility.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 45950c5

Please sign in to comment.