Skip to content

Commit

Permalink
* Performance:
Browse files Browse the repository at this point in the history
  * Read file by using buffer of 4096 instead of reading 20 bytes at a time
  * remove reflection when decoding tick data
  • Loading branch information
edward-frankieone committed May 26, 2024
1 parent 1e13b87 commit 8f13fb1
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 53 deletions.
5 changes: 2 additions & 3 deletions api/tickdata/downloader/tick_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"github.com/edward-yakop/go-duka/internal/bi5"
"github.com/edward-yakop/go-duka/internal/misc"
"github.com/stretchr/testify/assert"
"io/ioutil"
"os"
"testing"
"time"
Expand All @@ -31,11 +30,11 @@ func TestDownloader(t *testing.T) {
}

func fileExists(t *testing.T, filePath string) {
assert.True(t, misc.IsFileExists(filePath))
assert.Truef(t, misc.IsFileExists(filePath), "file path %s exists", filePath)
}

func createEmptyDir(t *testing.T) string {
dir, err := ioutil.TempDir(".", "test")
dir, err := os.MkdirTemp(".", "test")
assert.NoError(t, err)
t.Cleanup(func() {
_ = os.RemoveAll(dir)
Expand Down
1 change: 1 addition & 0 deletions examples/stream/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
download/**
29 changes: 4 additions & 25 deletions examples/stream/dd_finder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,17 @@ import (
"github.com/edward-yakop/go-duka/api/instrument"
"github.com/edward-yakop/go-duka/api/tickdata"
"github.com/edward-yakop/go-duka/api/tickdata/stream"
"github.com/edward-yakop/go-duka/internal/misc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"log/slog"
"math"
"os"
"strconv"
"testing"
"time"
)

func Test_StreamExample_DDFinder(t *testing.T) {
slog.SetDefault(
slog.New(
slog.NewTextHandler(
os.Stdout,
&slog.HandlerOptions{
Level: slog.LevelDebug,
},
),
),
)
misc.SetDefaultLog(slog.LevelDebug)

loc, _ := time.LoadLocation("EET")

Expand Down Expand Up @@ -60,7 +50,7 @@ func buyDDFinder(t *testing.T, instrument *instrument.Metadata, openTime time.Ti
) {
start := openTime.Add(-1 * time.Minute)
end := closeTime.Add(time.Minute)
s := stream.New(instrument, start, end, createEmptyDir(t))
s := stream.New(instrument, start, end, ".")

maxDD = math.MaxInt32
openPriceDiff = math.MaxInt32
Expand All @@ -72,7 +62,7 @@ func buyDDFinder(t *testing.T, instrument *instrument.Metadata, openTime time.Ti
}

if openPriceDiff == math.MaxInt32 {
logTick(t, " open", tickTime, tick)
logTick(t, "open", tickTime, tick)
openPriceDiff = int(math.Round((openPrice - tick.Ask) * 1000))
}

Expand All @@ -97,17 +87,6 @@ func buyDDFinder(t *testing.T, instrument *instrument.Metadata, openTime time.Ti
return openPriceDiff, maxDD, maxPositive, maxDDForMaxPositive, maxPositiveTime, closePriceDiff
}

func createEmptyDir(t *testing.T) string {
dir, err := os.MkdirTemp(".", "test")
if !assert.NoError(t, err) {
t.FailNow()
}
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(dir), "remove temporary dir failed")
})
return dir
}

func logTick(t *testing.T, op string, tickTime time.Time, tick *tickdata.TickData) {
if tick == nil {
return
Expand Down
62 changes: 39 additions & 23 deletions internal/bi5/bi5.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bi5

import (
"bufio"
"bytes"
"encoding/binary"
"github.com/edward-yakop/go-duka/api/instrument"
Expand All @@ -9,7 +10,6 @@ import (
"github.com/pkg/errors"
"io"
"os"
"strconv"
"time"

"github.com/edward-yakop/go-duka/internal/core"
Expand Down Expand Up @@ -133,36 +133,50 @@ func (b Bi5) sanitizeTo(to time.Time, location *time.Location) (time.Time, error
// struck.unpack(!IIIff)
// date, ask / point, bid / point, round(volume_ask * 100000), round(volume_bid * 100000)
func (b Bi5) decodeTickData(data []byte, symbol string, timeH time.Time) (*tickdata.TickData, error) {
raw := struct {
TimeMs int32 // millisecond offset of current hour
Ask int32
Bid int32
VolumeAsk float32
VolumeBid float32
}{}

if len(data) != TICK_BYTES {
return nil, errors.New("invalid length for tick data")
}

var err error
var timeMs, ask, bid int32
var volumeAsk, volumeBid float32

buf := bytes.NewBuffer(data)
if err := binary.Read(buf, binary.BigEndian, &raw); err != nil {
return nil, err
}
timeMs, err = read[int32](err, buf, "time")
ask, err = read[int32](err, buf, "ask")
bid, err = read[int32](err, buf, "bid")
volumeAsk, err = read[float32](err, buf, "volumeAsk")
volumeBid, err = read[float32](err, buf, "volumeBid")

var point = b.metadata.DecimalFactor()

t := tickdata.TickData{
Symbol: symbol,
Timestamp: timeH.Unix()*1000 + int64(raw.TimeMs), //timeH.Add(time.Duration(raw.TimeMs) * time.Millisecond),
Ask: float64(raw.Ask) / point,
Bid: float64(raw.Bid) / point,
VolumeAsk: float64(raw.VolumeAsk),
VolumeBid: float64(raw.VolumeBid),
Timestamp: timeH.Unix()*1000 + int64(timeMs), //timeH.Add(time.Duration(raw.TimeMs) * time.Millisecond),
Ask: float64(ask) / point,
Bid: float64(bid) / point,
VolumeAsk: float64(volumeAsk),
VolumeBid: float64(volumeBid),
}

return &t, nil
}

func read[T any](existingErr error, buf *bytes.Buffer, field string) (r T, err error) {
if existingErr != nil {
err = existingErr

return
}

err = binary.Read(buf, binary.BigEndian, &r)
if err != nil {
err = errors.Wrapf(err, "failed to read field [%s]", field)
}

return r, err
}

// Download from dukascopy
func (b Bi5) Download() error {
return b.downloader.Download(b.InstrumentCode(), b.dayHour)
Expand All @@ -180,12 +194,13 @@ func (b Bi5) EachTick(it tickdata.TickIterator) {
return
}

defer f.Close()
defer func(f *os.File) { _ = f.Close() }(f)

reader, err := lzma.NewReader(f)
if err != nil {
err = errors.Wrap(err, "Failed to create file reader")
reader, lzmaErr := lzma.NewReader(bufio.NewReader(f))
if lzmaErr != nil {
err = errors.Wrapf(err, "failed to create file [%s] reader", b.targetFilePath)
it(nil, err)

return
}

Expand All @@ -197,15 +212,16 @@ func (b Bi5) EachTick(it tickdata.TickIterator) {
bytesCount, err = reader.Read(bytesArr[:])
if err == io.EOF {
err = nil

break
}

if bytesCount != TICK_BYTES || err != nil {
err = errors.Wrap(err, "LZMA decode failed: ["+strconv.Itoa(bytesCount)+"] for file ["+b.targetFilePath+"]")
err = errors.Wrapf(err, "LZMA decode failed: [%d] for file [%s]", bytesCount, b.targetFilePath)
} else {
tick, err = b.decodeTickData(bytesArr[:], b.InstrumentCode(), b.dayHour)
if err != nil {
err = errors.Wrap(err, "Decode tick data failed for file ["+b.targetFilePath+"]")
err = errors.Wrapf(err, "decode tick data failed for file [%s]", b.targetFilePath)
}
}

Expand Down
5 changes: 3 additions & 2 deletions internal/export/csv/csv.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package csv

import (
"bufio"
"encoding/csv"
"fmt"
"github.com/edward-yakop/go-duka/api/instrument"
Expand Down Expand Up @@ -66,7 +67,7 @@ func (c *CsvDump) PackTicks(barTimestamp uint32, ticks []*tickdata.TickData) err

const dayFormat = "2006-01-02"

// worker goroutine which flust data to disk
// worker goroutine which flush data to disk
func (c *CsvDump) worker() error {
fname := fmt.Sprintf("%s-%s-%s.%s",
c.instrument.Code(),
Expand All @@ -88,7 +89,7 @@ func (c *CsvDump) worker() error {
slog.Info("Saved Ticks: %d.", c.tickCount)

Check failure on line 89 in internal/export/csv/csv.go

View workflow job for this annotation

GitHub Actions / Build

slog.Info arg "c.tickCount" should be a string or a slog.Attr (possible missing key or value)
}()

csvw := csv.NewWriter(f)
csvw := csv.NewWriter(bufio.NewWriter(f))
defer csvw.Flush()

// write header
Expand Down
19 changes: 19 additions & 0 deletions internal/misc/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package misc

import (
"log/slog"
"os"
)

func SetDefaultLog(level slog.Leveler) {
slog.SetDefault(
slog.New(
slog.NewTextHandler(
os.Stdout,
&slog.HandlerOptions{
Level: level,
},
),
),
)
}

0 comments on commit 8f13fb1

Please sign in to comment.