Skip to content

Commit

Permalink
Bulkinsert support pure list json (#27990)
Browse files Browse the repository at this point in the history
Signed-off-by: yhmo <[email protected]>
  • Loading branch information
yhmo authored Nov 1, 2023
1 parent 8c71e2b commit abd5b19
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 42 deletions.
53 changes: 28 additions & 25 deletions internal/util/importutil/json_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,38 +234,41 @@ func (p *JSONParser) ParseRows(reader *IOReader, handler JSONRowHandler) error {
log.Warn("JSON parser: failed to decode the JSON file", zap.Error(err))
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
}
if t != json.Delim('{') {
log.Warn("JSON parser: invalid JSON format, the content should be started with'{'")
return errors.New("invalid JSON format, the content should be started with'{'")
if t != json.Delim('{') && t != json.Delim('[') {
log.Warn("JSON parser: invalid JSON format, the content should be started with '{' or '['")
return errors.New("invalid JSON format, the content should be started with '{' or '['")
}

// read the first level
isEmpty := true
isOldFormat := (t == json.Delim('{'))
for dec.More() {
// read the key
t, err := dec.Token()
if err != nil {
log.Warn("JSON parser: failed to decode the JSON file", zap.Error(err))
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
}
key := t.(string)
keyLower := strings.ToLower(key)
// the root key should be RowRootNode
if keyLower != RowRootNode {
log.Warn("JSON parser: invalid JSON format, the root key is not found", zap.String("RowRootNode", RowRootNode), zap.String("key", key))
return fmt.Errorf("invalid JSON format, the root key should be '%s', but get '%s'", RowRootNode, key)
}
if isOldFormat {
// read the key
t, err := dec.Token()
if err != nil {
log.Warn("JSON parser: failed to decode the JSON file", zap.Error(err))
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
}
key := t.(string)
keyLower := strings.ToLower(key)
// the root key should be RowRootNode
if keyLower != RowRootNode {
log.Warn("JSON parser: invalid JSON format, the root key is not found", zap.String("RowRootNode", RowRootNode), zap.String("key", key))
return fmt.Errorf("invalid JSON format, the root key should be '%s', but get '%s'", RowRootNode, key)
}

// started by '['
t, err = dec.Token()
if err != nil {
log.Warn("JSON parser: failed to decode the JSON file", zap.Error(err))
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
}
// started by '['
t, err = dec.Token()
if err != nil {
log.Warn("JSON parser: failed to decode the JSON file", zap.Error(err))
return fmt.Errorf("failed to decode the JSON file, error: %w", err)
}

if t != json.Delim('[') {
log.Warn("JSON parser: invalid JSON format, rows list should begin with '['")
return errors.New("invalid JSON format, rows list should begin with '['")
if t != json.Delim('[') {
log.Warn("JSON parser: invalid JSON format, rows list should begin with '['")
return errors.New("invalid JSON format, rows list should begin with '['")
}
}

// read buffer
Expand Down
57 changes: 40 additions & 17 deletions internal/util/importutil/json_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,21 +116,16 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
content.Rows = append(content.Rows, row)
}

binContent, err := json.Marshal(content)
assert.NoError(t, err)
strContent := string(binContent)
reader := strings.NewReader(strContent)

consumer := &mockJSONRowConsumer{
handleErr: nil,
rows: make([]map[int64]interface{}, 0),
handleCount: 0,
}
verifyRows := func(ioReader *IOReader) {
consumer := &mockJSONRowConsumer{
handleErr: nil,
rows: make([]map[int64]interface{}, 0),
handleCount: 0,
}

t.Run("parse success", func(t *testing.T) {
// set bufRowCount = 4, means call handle() after reading 4 rows
parser.bufRowCount = 4
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(len(strContent))}, consumer)
err = parser.ParseRows(ioReader, consumer)
assert.NoError(t, err)
assert.Equal(t, len(content.Rows), len(consumer.rows))
for i := 0; i < len(consumer.rows); i++ {
Expand Down Expand Up @@ -193,6 +188,22 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
assert.InDelta(t, contenctRow.FieldFloatVector[k], float32(fval), 10e-6)
}
}
}

consumer := &mockJSONRowConsumer{
handleErr: nil,
rows: make([]map[int64]interface{}, 0),
handleCount: 0,
}

t.Run("parse old format success", func(t *testing.T) {
binContent, err := json.Marshal(content)
assert.NoError(t, err)
strContent := string(binContent)
reader := strings.NewReader(strContent)

ioReader := &IOReader{r: reader, fileSize: int64(len(strContent))}
verifyRows(ioReader)

// empty content
reader = strings.NewReader(`{}`)
Expand All @@ -207,8 +218,25 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
assert.NoError(t, err)
})

t.Run("parse new format success", func(t *testing.T) {
binContent, err := json.Marshal(content.Rows)
assert.NoError(t, err)
strContent := string(binContent)
reader := strings.NewReader(strContent)
fmt.Println(strContent)

ioReader := &IOReader{r: reader, fileSize: int64(len(strContent))}
verifyRows(ioReader)

// empty list
reader = strings.NewReader(`[]`)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(2)}, consumer)
assert.NoError(t, err)
})

t.Run("error cases", func(t *testing.T) {
// handler is nil
reader := strings.NewReader("")
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(0)}, nil)
assert.Error(t, err)

Expand Down Expand Up @@ -259,11 +287,6 @@ func Test_JSONParserParseRows_IntPK(t *testing.T) {
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(10)}, consumer)
assert.Error(t, err)

// not valid json format
reader = strings.NewReader(`[]`)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(2)}, consumer)
assert.Error(t, err)

// empty file
reader = strings.NewReader(``)
err = parser.ParseRows(&IOReader{r: reader, fileSize: int64(0)}, consumer)
Expand Down

0 comments on commit abd5b19

Please sign in to comment.