Skip to content

Commit

Permalink
Aggregate new dlob orderbooks (#493)
Browse files Browse the repository at this point in the history
* add new agg function

* update historical service to pull from multiple tickers, update start time calculation

* add back scheduled jobs

* add delayed init start for dlob historical cron job, various other refactors

* fix ktlint items

* fix more ktlint

* add some logging

* add mainnet dlob address

* Add deletion of historical daily database version

* Update delete timestamp

* revert cron change
  • Loading branch information
nullpointer0x00 authored Oct 27, 2023
1 parent 1bdc002 commit 64e8b2c
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SELECT 'Deleting from token_historical_daily dates after 8/03/2023' AS comment;

DELETE FROM token_historical_daily WHERE historical_timestamp > '2023-08-03 00:00:00';
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class ValidatorMarketRateStatsRecord(id: EntityID<Int>) : IntEntity(id) {
}

fun findByAddress(address: String, fromDate: DateTime?, toDate: DateTime?, count: Int) = transaction {
val query = ValidatorMarketRateStatsTable.select { ValidatorMarketRateStatsTable.operatorAddress eq address }
val query =
ValidatorMarketRateStatsTable.select { ValidatorMarketRateStatsTable.operatorAddress eq address }
if (fromDate != null) {
query.andWhere { ValidatorMarketRateStatsTable.date greaterEq fromDate }
}
Expand Down Expand Up @@ -272,6 +273,14 @@ class TokenHistoricalDailyRecord(id: EntityID<DateTime>) : Entity<DateTime>(id)
.orderBy(Pair(TokenHistoricalDailyTable.timestamp, SortOrder.DESC))
.firstOrNull()?.data?.quote?.get(USD_UPPER)?.close ?: BigDecimal.ZERO
}

fun getLatestDateEntry(): TokenHistoricalDailyRecord? = transaction {
return@transaction TokenHistoricalDailyRecord
.all()
.orderBy(Pair(TokenHistoricalDailyTable.timestamp, SortOrder.DESC))
.limit(1)
.firstOrNull()
}
}

var timestamp by TokenHistoricalDailyTable.timestamp
Expand Down Expand Up @@ -305,7 +314,7 @@ class ProcessQueueRecord(id: EntityID<Int>) : IntEntity(id) {
fun delete(processType: ProcessQueueType, value: String) = transaction {
ProcessQueueTable.deleteWhere {
(ProcessQueueTable.processType eq processType.name) and
(ProcessQueueTable.processValue eq value)
(processValue eq value)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ class TokenService(private val accountClient: AccountGrpcClient) {
}

fun getTokenBreakdown() = runBlocking {
val bonded = accountClient.getStakingPool().pool.bondedTokens.toBigDecimal().roundWhole().toCoinStr(UTILITY_TOKEN)
val bonded =
accountClient.getStakingPool().pool.bondedTokens.toBigDecimal().roundWhole().toCoinStr(UTILITY_TOKEN)
TokenSupply(
maxSupply().toCoinStr(UTILITY_TOKEN),
totalSupply().toCoinStr(UTILITY_TOKEN),
Expand All @@ -157,16 +158,19 @@ class TokenService(private val accountClient: AccountGrpcClient) {
}

fun nhashMarkerAddr() = MarkerCacheRecord.findByDenom(UTILITY_TOKEN)?.markerAddress!!
fun burnedSupply() = runBlocking { accountClient.getMarkerBalance(nhashMarkerAddr(), UTILITY_TOKEN).toBigDecimal().roundWhole() }
fun burnedSupply() =
runBlocking { accountClient.getMarkerBalance(nhashMarkerAddr(), UTILITY_TOKEN).toBigDecimal().roundWhole() }

fun moduleAccounts() = AccountRecord.findAccountsByType(listOf(Auth.ModuleAccount::class.java.simpleName))
fun zeroSeqAccounts() = AccountRecord.findZeroSequenceAccounts()
fun vestingAccounts() = AccountRecord.findAccountsByType(vestingAccountTypes)
fun contractAccounts() = AccountRecord.findContractAccounts()
fun allAccounts() = transaction { AccountRecord.all().toMutableList() }
fun communityPoolSupply() = runBlocking { accountClient.getCommunityPoolAmount(UTILITY_TOKEN).toBigDecimal().roundWhole() }
fun communityPoolSupply() =
runBlocking { accountClient.getCommunityPoolAmount(UTILITY_TOKEN).toBigDecimal().roundWhole() }

fun richListAccounts() =
allAccounts().addressList() - zeroSeqAccounts().toSet() - moduleAccounts().addressList() -
contractAccounts().addressList() - setOf(nhashMarkerAddr())
allAccounts().addressList() - zeroSeqAccounts().toSet() - moduleAccounts().addressList() - contractAccounts().addressList() - setOf(nhashMarkerAddr())

fun totalBalanceForList(addresses: Set<String>) = runBlocking {
TokenDistributionPaginatedResultsRecord.findByAddresses(addresses).asFlow()
Expand Down Expand Up @@ -217,10 +221,10 @@ class TokenService(private val accountClient: AccountGrpcClient) {
}
}

fun getHistoricalFromDlob(startTime: DateTime): DlobHistBase? = runBlocking {
fun getHistoricalFromDlob(startTime: DateTime, tickerId: String): DlobHistBase? = runBlocking {
try {
KTOR_CLIENT_JAVA.get("https://www.dlob.io:443/gecko/external/api/v1/exchange/historical_trades") {
parameter("ticker_id", "HASH_USD")
parameter("ticker_id", tickerId)
parameter("type", "buy")
parameter("start_time", DateTimeFormat.forPattern("dd-MM-yyyy").print(startTime))
accept(ContentType.Application.Json)
Expand All @@ -234,6 +238,15 @@ class TokenService(private val accountClient: AccountGrpcClient) {
}
}

fun getHistoricalFromDlob(startTime: DateTime): DlobHistBase? {
val tickerIds = listOf("HASH_USD", "HASH_USDOMNI")

val dlobHistorical = tickerIds
.flatMap { getHistoricalFromDlob(startTime, it)?.buy.orEmpty() }

return if (dlobHistorical.isNotEmpty()) DlobHistBase(dlobHistorical) else null
}

fun getTokenHistorical(fromDate: DateTime?, toDate: DateTime?) =
TokenHistoricalDailyRecord.findForDates(fromDate?.startOfDay(), toDate?.startOfDay())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ import tendermint.types.BlockOuterClass
import java.math.BigDecimal
import java.time.OffsetDateTime
import java.time.ZoneOffset
import javax.annotation.PostConstruct

@Service
class AsyncService(
Expand All @@ -104,6 +105,18 @@ class AsyncService(
protected val logger = logger(AsyncService::class)
protected var collectHistorical = true

@PostConstruct
fun asyncServiceOnStartInit() {
Thread {
try {
Thread.sleep(5000)
updateTokenHistorical()
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
}
}.start()
}

@Scheduled(initialDelay = 0L, fixedDelay = 5000L)
fun updateLatestBlockHeightJob() {
val index = getBlockIndex()
Expand Down Expand Up @@ -145,7 +158,7 @@ class AsyncService(
}

fun getBlockIndex() = blockService.getBlockIndexFromCache()?.let {
Pair<Int?, Int?>(it.maxHeightRead, it.minHeightRead)
Pair(it.maxHeightRead, it.minHeightRead)
}

fun startCollectingHistoricalBlocks(blockIndex: Pair<Int?, Int?>?) =
Expand Down Expand Up @@ -248,11 +261,11 @@ class AsyncService(
val now = OffsetDateTime.now().withOffsetSameInstant(ZoneOffset.UTC).toString()
cacheService.getCacheValue(key)!!.let { cache ->
pricingService.getPricingAsync(cache.cacheValue!!, "async pricing update").forEach { price ->
// dont set price from PE
// don't set price from PE
if (price.markerDenom != UTILITY_TOKEN) {
assetService.getAssetRaw(price.markerDenom).let { pricingService.insertAssetPricing(it, price) }
} else {
// Pull price from CMC, calced to the true base denom price
// Pull price from CMC, calculate to the true base denom price
val cmcPrice =
tokenService.getTokenLatest()?.quote?.get(USD_UPPER)?.price
?.let {
Expand Down Expand Up @@ -280,8 +293,14 @@ class AsyncService(
@Scheduled(cron = "0 0 1 * * ?") // Every day at 1 am
fun updateTokenHistorical() {
val today = DateTime.now().startOfDay()
val startDate = today.minusMonths(1)
var startDate = today.minusMonths(1)
val latest = TokenHistoricalDailyRecord.getLatestDateEntry()
if (latest != null) {
startDate = latest.timestamp.minusDays(1)
}
val dlobRes = tokenService.getHistoricalFromDlob(startDate) ?: return
logger.info("Updating token historical data starting from $startDate with ${dlobRes.buy.size} buy records for roll-up.")

val baseMap = Interval(startDate, today)
.let { int -> generateSequence(int.start) { dt -> dt.plusDays(1) }.takeWhile { dt -> dt < int.end } }
.map { it to emptyList<DlobHistorical>() }.toMap().toMutableMap()
Expand Down Expand Up @@ -312,7 +331,9 @@ class AsyncService(
low = low?.price ?: prevPrice,
close = close,
volume = usdVolume,
market_cap = close.multiply(tokenService.totalSupply().divide(UTILITY_TOKEN_BASE_MULTIPLIER)),
market_cap = close.multiply(
tokenService.totalSupply().divide(UTILITY_TOKEN_BASE_MULTIPLIER)
),
timestamp = closeDate
)
)
Expand Down Expand Up @@ -346,7 +367,10 @@ class AsyncService(
today,
mapOf(USD_UPPER to CmcLatestQuoteAbbrev(price, percentChg, vol24Hr, marketCap, today))
)
CacheUpdateRecord.updateCacheByKey(CacheKeys.UTILITY_TOKEN_LATEST.key, VANILLA_MAPPER.writeValueAsString(rec))
CacheUpdateRecord.updateCacheByKey(
CacheKeys.UTILITY_TOKEN_LATEST.key,
VANILLA_MAPPER.writeValueAsString(rec)
)
}
}

Expand Down Expand Up @@ -443,7 +467,8 @@ class AsyncService(
try {
transaction { it.apply { this.processing = true } }
send(it.processValue)
} catch (_: Exception) { }
} catch (_: Exception) {
}
}
}
}
Expand Down

0 comments on commit 64e8b2c

Please sign in to comment.