Skip to content

Commit

Permalink
chore: unmarshal data market address from submission bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
anomit committed Oct 21, 2024
1 parent e29e4ab commit d7311a1
Showing 1 changed file with 39 additions and 0 deletions.
39 changes: 39 additions & 0 deletions pkgs/service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,45 @@ func handleStream(stream network.Stream) {
log.Debugln("Error reading:", err)
return
}
func handleStream(stream network.Stream) {
defer stream.Close()
for {
buf := make([]byte, 1024)
length, err := stream.Read(buf)
if err != nil {
if err == io.EOF {
log.Debugln("End of stream reached")
break
}
log.Debugln("Error reading:", err)
return
}
submissionId := uuid.New()
err = submissionId.UnmarshalText(buf[:36])
if err != nil {
log.Debugln("Unable to unmarshal uuid for submission: ", string(buf[:36]))
return
}

// Extract data market address
// EVM address is a fixed length of 42 characters (0x followed by 40 hex digits)
dataMarketAddress := string(buf[36:78])
// convert to checksum address
dataMarketAddress = ethutil.HexToAddress(dataMarketAddress).Hex()

// Add submission to Redis queue
queueData := map[string]interface{}{
"submission_id": submissionId.String(),
"data_market_address": dataMarketAddress,
"data": string(buf[78:length]),
}
queueDataJSON, err := json.Marshal(queueData)
if err != nil {
log.Debugln("Error marshalling queue data:", err)
continue
}

err = redis.RedisClient.LPush(context.Background(), "submissionQueue", queueDataJSON).Err()
submissionId := uuid.New()
err = submissionId.UnmarshalText(buf[:36])
if err != nil {
Expand Down

0 comments on commit d7311a1

Please sign in to comment.