Skip to content

Commit

Permalink
avro multi-file output (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
hzarka authored Nov 12, 2019
1 parent d0de295 commit 539efc7
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 15 deletions.
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

test: testavro testjson
test: testavro testjson testavromulti

testbuild:
go build
Expand All @@ -12,6 +12,13 @@ testavro: testbuild
fastavro tmp/test_b.avro
#fastavro tmp/test_c.avro


testavromulti: testbuild
./mysqlbqdump --epoch=false --format=avro test test_a --output-dir=tmp/test_a/ --output-file-row-limit=1
fastavro tmp/test_a/00000001.avro
fastavro tmp/test_a/00000002.avro
#fastavro tmp/test_c.avro

testjson: testbuild
./mysqlbqdump --epoch=false --format=json test test_a > tmp/test_a.json
./mysqlbqdump --epoch=false --format=json test test_b > tmp/test_b.json
Expand Down
24 changes: 20 additions & 4 deletions mysqlbqdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ type Writer interface {
}

type Config struct {
FieldSep string
RowSep string
NullString string
DateEpoch bool
FieldSep string
RowSep string
NullString string
OutputDir string
OutputFileRowLimit int
DateEpoch bool
}

func getDSN(filename string, section string, database string) string {
Expand All @@ -41,6 +43,8 @@ func main() {
flag.StringVar(&config.FieldSep, "csv-fields-terminated-by", "\t", "field separator")
flag.StringVar(&config.RowSep, "csv-records-terminated-by", "\n", "row separator")
flag.StringVar(&config.NullString, "csv-null-string", "\\N", "output string for NULL values")
flag.StringVar(&config.OutputDir, "output-dir", "\\N", "output directory")
flag.IntVar(&config.OutputFileRowLimit, "output-file-row-limit", 0, "output file row limit")
flag.BoolVar(&config.DateEpoch, "epoch", true, "output datetime as epoch instead of RFC3339")
defaults_file := flag.String("defaults-file", "my.cnf", "defaults file")
defaults_group_suffix := flag.String("defaults-group-suffix", "", "defaults group suffix")
Expand All @@ -57,6 +61,18 @@ func main() {
flag.Usage()
os.Exit(1)
}

if config.OutputFileRowLimit > 0 {
if config.OutputDir == "\\N" {
log.Fatalln("cant specify output-file-row-limit without output-dir")
}
if *format != "avro" {
log.Fatalln("only avro supported for multi-file export")
}
err := os.MkdirAll(config.OutputDir, os.ModePerm)
handleError(err)
}

dsn := getDSN(*defaults_file, "client"+*defaults_group_suffix, args[0])
rows := getRows(dsn, args[1])
if *format == "json" {
Expand Down
36 changes: 26 additions & 10 deletions write_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import "math/big"

type AvroWriter struct {
*Config
enc *goavro.OCFWriter
row map[string]interface{}
numRows int
numFiles int
enc *goavro.OCFWriter
row map[string]interface{}
}

func NewAvroWriter(config *Config) *AvroWriter {
return &AvroWriter{config, nil, make(map[string]interface{})}
return &AvroWriter{config, 0, 0, nil, make(map[string]interface{})}
}

func (w *AvroWriter) WriteRow(columns []string, converters []convertfn, row []interface{}) {
Expand All @@ -33,15 +35,23 @@ func (w *AvroWriter) WriteRows(rows *sql.Rows) {
columnTypes, err := rows.ColumnTypes()
handleError(err)

fns, writer := createWriter(columnTypes)
w.enc = writer
fns, schema := getAvroSchema(columnTypes)

w.enc = w.createWriter(schema)

vals := make([]interface{}, len(columnNames))
scanArgs := make([]interface{}, len(columnNames))
for i := 0; i < len(columnNames); i++ {
scanArgs[i] = &vals[i]
}

for rows.Next() {
if w.OutputFileRowLimit > 0 {
if w.numRows >= w.OutputFileRowLimit {
w.enc = w.createWriter(schema)
}
w.numRows += 1
}
err = rows.Scan(scanArgs...)
if err != nil {
fatal(err)
Expand All @@ -50,13 +60,19 @@ func (w *AvroWriter) WriteRows(rows *sql.Rows) {
}
}

func createWriter(columnTypes []*sql.ColumnType) ([]convertfn, *goavro.OCFWriter) {
fns, schema := getAvroSchema(columnTypes)
func (w *AvroWriter) createWriter(schema string) *goavro.OCFWriter {
config := goavro.OCFConfig{os.Stdout, nil, schema, "snappy", nil}
if w.OutputFileRowLimit > 0 {
w.numFiles += 1
w.numRows = 0
fp, err := os.Create(w.OutputDir + fmt.Sprintf("/%08d.avro", w.numFiles))
handleError(err)
config = goavro.OCFConfig{fp, nil, schema, "snappy", nil}
}
ret, err := goavro.NewOCFWriter(config)
handleError(err)

return fns, ret
return ret
}

func getAvroSchema(columnTypes []*sql.ColumnType) ([]convertfn, string) {
Expand All @@ -83,7 +99,7 @@ func getAvroSchema(columnTypes []*sql.ColumnType) ([]convertfn, string) {
// - for now, we'll just reuse the first decimal type for all decimal types in the schema
//
// This could lead to loss of precision, but better than having wrong values for now
var FIRST_DECIMAL_TYPE string = "";
var FIRST_DECIMAL_TYPE string = ""

func getAvroTypeFromMysqlType(ctype *sql.ColumnType) (convertfn, string) {
dbt := strings.ToLower(ctype.DatabaseTypeName())
Expand All @@ -99,7 +115,7 @@ func getAvroTypeFromMysqlType(ctype *sql.ColumnType) (convertfn, string) {
if dbt == "decimal" {
precision, scale, _ := ctype.DecimalSize()
if FIRST_DECIMAL_TYPE == "" {
FIRST_DECIMAL_TYPE = fmt.Sprintf(typeJsons["decimal"], precision, scale)
FIRST_DECIMAL_TYPE = fmt.Sprintf(typeJsons["decimal"], precision, scale)
}
return typeFns["decimal"], FIRST_DECIMAL_TYPE
}
Expand Down

0 comments on commit 539efc7

Please sign in to comment.