forked from snowflakedb/gosnowflake
-
Notifications
You must be signed in to change notification settings - Fork 0
/
chunk_arrow.go
70 lines (59 loc) · 1.62 KB
/
chunk_arrow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// Copyright (c) 2020-2020 Snowflake Computing Inc. All right reserved.
package gosnowflake
import (
"bytes"
"encoding/base64"
"github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
"io"
)
type arrowResultChunk struct {
reader ipc.Reader
rowCount int
uncompressedSize int
allocator memory.Allocator
}
func (arc *arrowResultChunk) decodeArrowChunk(rowType []execResponseRowType) ([]chunkRowType, error) {
logger.Debug("Arrow Decoder")
var chunkRows []chunkRowType
for {
record, err := arc.reader.Read()
if err == io.EOF {
return chunkRows, nil
} else if err != nil {
return nil, err
}
numRows := int(record.NumRows())
columns := record.Columns()
tmpRows := make([]chunkRowType, numRows)
for colIdx, col := range columns {
destcol := make([]snowflakeValue, numRows)
err := arrowToValue(&destcol, rowType[colIdx], col)
if err != nil {
return nil, err
}
for rowIdx := 0; rowIdx < numRows; rowIdx++ {
if colIdx == 0 {
tmpRows[rowIdx] = chunkRowType{ArrowRow: make([]snowflakeValue, len(columns))}
}
tmpRows[rowIdx].ArrowRow[colIdx] = destcol[rowIdx]
}
}
chunkRows = append(chunkRows, tmpRows...)
arc.rowCount += numRows
}
}
/**
Build arrow chunk based on RowSet of base64
*/
func buildFirstArrowChunk(rowsetBase64 string) arrowResultChunk {
rowSetBytes, err := base64.StdEncoding.DecodeString(rowsetBase64)
if err != nil {
return arrowResultChunk{}
}
rr, err := ipc.NewReader(bytes.NewReader(rowSetBytes))
if err != nil {
return arrowResultChunk{}
}
return arrowResultChunk{*rr, 0, 0, memory.NewGoAllocator()}
}