Skip to content

Commit

Permalink
(NOBIDS) batch inserting of ethstore-data (#2359)
Browse files Browse the repository at this point in the history
* (NOBIDS) batch inserting of ethstore-data

* (NOBIDS) fix historical_pool_performance insert
  • Loading branch information
guybrush authored Jun 27, 2023
1 parent 5be2109 commit 678af5c
Showing 1 changed file with 68 additions and 33 deletions.
101 changes: 68 additions & 33 deletions exporter/ethstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"eth2-exporter/services"
"eth2-exporter/types"
"eth2-exporter/utils"
"fmt"
"sort"
"strconv"
"strings"
"time"

ethstore "github.com/gobitfly/eth.store"
Expand Down Expand Up @@ -62,6 +64,41 @@ func (ese *EthStoreExporter) ExportDay(day string) error {
}
defer tx.Rollback()

numArgs := 10
batchSize := 65535 / numArgs // max 65535 params per batch, since postgres uses int16 for binding input params
valueArgs := make([]interface{}, 0, batchSize*numArgs)
valueStrings := make([]string, 0, batchSize)
valueStringArr := make([]string, numArgs)
batchIdx, allIdx := 0, 0
for index, day := range validators {
for u := 0; u < numArgs; u++ {
valueStringArr[u] = fmt.Sprintf("$%d", batchIdx*numArgs+1+u)
}
valueStrings = append(valueStrings, "("+strings.Join(valueStringArr, ",")+")")
valueArgs = append(valueArgs, day.Day)
valueArgs = append(valueArgs, index)
valueArgs = append(valueArgs, day.EffectiveBalanceGwei.Mul(decimal.NewFromInt(1e9)))
valueArgs = append(valueArgs, day.StartBalanceGwei.Mul(decimal.NewFromInt(1e9)))
valueArgs = append(valueArgs, day.EndBalanceGwei.Mul(decimal.NewFromInt(1e9)))
valueArgs = append(valueArgs, day.DepositsSumGwei.Mul(decimal.NewFromInt(1e9)))
valueArgs = append(valueArgs, day.TxFeesSumWei)
valueArgs = append(valueArgs, day.ConsensusRewardsGwei.Mul(decimal.NewFromInt(1e9)))
valueArgs = append(valueArgs, day.TotalRewardsWei)
valueArgs = append(valueArgs, day.Apr)
batchIdx++
allIdx++
if batchIdx >= batchSize || allIdx >= len(validators) {
stmt := fmt.Sprintf(`INSERT INTO eth_store_stats (day, validator, effective_balances_sum_wei, start_balances_sum_wei, end_balances_sum_wei, deposits_sum_wei, tx_fees_sum_wei, consensus_rewards_sum_wei, total_rewards_wei, apr) VALUES %s`, strings.Join(valueStrings, ","))
_, err := tx.Exec(stmt, valueArgs...)
if err != nil {
return err
}
batchIdx = 0
valueArgs = valueArgs[:0]
valueStrings = valueStrings[:0]
}
}

stmt, err := tx.Prepare(`
INSERT INTO eth_store_stats (day, validator, effective_balances_sum_wei, start_balances_sum_wei, end_balances_sum_wei, deposits_sum_wei, tx_fees_sum_wei, consensus_rewards_sum_wei, total_rewards_wei, apr)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`)
Expand All @@ -86,40 +123,38 @@ func (ese *EthStoreExporter) ExportDay(day string) error {
return err
}

for index, day := range validators {
_, err = stmt.Exec(
day.Day,
index,
day.EffectiveBalanceGwei.Mul(decimal.NewFromInt(1e9)),
day.StartBalanceGwei.Mul(decimal.NewFromInt(1e9)),
day.EndBalanceGwei.Mul(decimal.NewFromInt(1e9)),
day.DepositsSumGwei.Mul(decimal.NewFromInt(1e9)),
day.TxFeesSumWei,
day.ConsensusRewardsGwei.Mul(decimal.NewFromInt(1e9)),
day.TotalRewardsWei,
day.Apr,
)
if err != nil {
return err
}
}

_, err = tx.Exec(`
insert into historical_pool_performance
select
eth_store_stats.day,
COALESCE(validator_pool.pool, 'Unknown'),
COUNT(*) as validators,
sum(effective_balances_sum_wei) as effective_balances_sum_wei,
sum(start_balances_sum_wei) as start_balances_sum_wei,
sum(end_balances_sum_wei) as end_balances_sum_wei,
sum(deposits_sum_wei) as deposits_sum_wei,
sum(tx_fees_sum_wei) as tx_fees_sum_wei,
sum(consensus_rewards_sum_wei) as tx_fees_sum_wei,
sum(total_rewards_wei) as total_rewards_wei,
avg(eth_store_stats.apr) as apr
from validators left join validator_pool on validators.pubkey = validator_pool.publickey join eth_store_stats on validators.validatorindex = eth_store_stats.validator where day = $1 group by validator_pool.pool, eth_store_stats.day
;`, ethStoreDay.Day)
insert into historical_pool_performance
select
eth_store_stats.day,
coalesce(validator_pool.pool, 'Unknown'),
count(*) as validators,
sum(effective_balances_sum_wei) as effective_balances_sum_wei,
sum(start_balances_sum_wei) as start_balances_sum_wei,
sum(end_balances_sum_wei) as end_balances_sum_wei,
sum(deposits_sum_wei) as deposits_sum_wei,
sum(tx_fees_sum_wei) as tx_fees_sum_wei,
sum(consensus_rewards_sum_wei) as consensus_rewards_sum_wei,
sum(total_rewards_wei) as total_rewards_wei,
avg(eth_store_stats.apr) as apr
from validators
left join validator_pool on validators.pubkey = validator_pool.publickey
inner join eth_store_stats on validators.validatorindex = eth_store_stats.validator
where day = $1
group by validator_pool.pool, eth_store_stats.day
on conflict (day, pool) do update set
day = excluded.day,
pool = excluded.pool,
validators = excluded.validators,
effective_balances_sum_wei = excluded.effective_balances_sum_wei,
start_balances_sum_wei = excluded.start_balances_sum_wei,
end_balances_sum_wei = excluded.end_balances_sum_wei,
deposits_sum_wei = excluded.deposits_sum_wei,
tx_fees_sum_wei = excluded.tx_fees_sum_wei,
consensus_rewards_sum_wei = excluded.consensus_rewards_sum_wei,
total_rewards_wei = excluded.total_rewards_wei,
apr = excluded.apr`,
ethStoreDay.Day)
if err != nil {
return err
}
Expand Down

0 comments on commit 678af5c

Please sign in to comment.