diff --git a/cdc/cdc/sink/codec/json.go b/cdc/cdc/sink/codec/json.go index 2740e977..c17bdf2b 100644 --- a/cdc/cdc/sink/codec/json.go +++ b/cdc/cdc/sink/codec/json.go @@ -344,6 +344,7 @@ func mqMessageToKvEvent(key *mqMessageKey, value *mqMessageValue) *model.RawKVEn // e.Table.TableID = *key.Partition // e.Table.IsPartition = true // } + e.OpType = value.OpType e.Key = key.Key e.Value = value.Value e.ExpiredTs = decodeExpiredTs(value.ExpiredTs) diff --git a/cdc/cdc/sink/codec/json_test.go b/cdc/cdc/sink/codec/json_test.go index cd762314..06118903 100644 --- a/cdc/cdc/sink/codec/json_test.go +++ b/cdc/cdc/sink/codec/json_test.go @@ -269,22 +269,27 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) { defer testleak.AfterTest(c)() encoder := NewJSONEventBatchEncoder() - // the size of `testEvent` is 87 + // the size of `testEvent` is 75 testEvent := &model.RawKVEntry{ - CRTs: 1, + OpType: model.OpTypePut, + Key: []byte("key"), + Value: []byte("value"), + CRTs: 100, + ExpiredTs: 200, // Table: &model.TableName{Schema: "a", Table: "b"}, // Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, } + eventSize := 75 // for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44, just can hold it. - a := strconv.Itoa(87 + 44) + a := strconv.Itoa(eventSize + 44) err := encoder.SetParams(map[string]string{"max-message-bytes": a}) c.Check(err, check.IsNil) r, err := encoder.AppendChangedEvent(testEvent) c.Check(err, check.IsNil) c.Check(r, check.Equals, EncoderNoOperation) - a = strconv.Itoa(87 + 43) + a = strconv.Itoa(eventSize + 43) err = encoder.SetParams(map[string]string{"max-message-bytes": a}) c.Assert(err, check.IsNil) r, err = encoder.AppendChangedEvent(testEvent)