Skip to content
This repository has been archived by the owner on Jul 19, 2024. It is now read-only.

Update quote assets #166

Closed
Closed
32 changes: 24 additions & 8 deletions counterblock/lib/blockfeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def publish_mempool_tx():
'order_by': 'timestamp',
'order_dir': 'DESC'
}
new_txs = util.jsonrpc_api("get_mempool", params, abort_on_error=True)
new_txs = util.jsonrpc_api("get_mempool", params, abort_on_error=True, use_cache=False)
num_skipped_tx = 0
if new_txs:
for new_tx in new_txs['result']:
Expand Down Expand Up @@ -152,11 +152,17 @@ def parse_block(block_data):

config.state['my_latest_block'] = new_block

if config.state['my_latest_block']['block_index'] % 10 == 0: # every 10 blocks print status
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
logger.info("Block: %i of %i [message height=%s]" % (
config.state['my_latest_block']['block_index'],
config.state['cp_backend_block_index']
if config.state['cp_backend_block_index'] else '???',
config.state['last_message_index'] if config.state['last_message_index'] != -1 else '???'))
if config.state['my_latest_block']['block_index'] % 10 == 0: # every 10 blocks print status
root_logger.setLevel(logging.WARNING)

return True

# grab our stored preferences, and rebuild the database if necessary
Expand All @@ -171,7 +177,7 @@ def parse_block(block_data):
config.TESTNET))
else:
logger.warn("counterblockd database app_config collection doesn't exist. BUILDING FROM SCRATCH...")
app_config = database.reparse()
app_config = database.init_reparse()
else:
app_config = app_config[0]
# get the last processed block out of mongo
Expand All @@ -189,6 +195,12 @@ def parse_block(block_data):
autopilot_runner = 0
iteration = 0

if config.IS_REPARSING:
reparse_start = time.time()
root_logger = logging.getLogger()
root_logger_level = root_logger.getEffectiveLevel()
root_logger.setLevel(logging.WARNING)

# start polling counterparty-server for new blocks
cp_running_info = None
while True:
Expand All @@ -203,9 +215,9 @@ def parse_block(block_data):

if not autopilot or autopilot_runner == 0:
try:
cp_running_info = util.jsonrpc_api("get_running_info", abort_on_error=True)['result']
cp_running_info = util.jsonrpc_api("get_running_info", abort_on_error=True, use_cache=False)['result']
except Exception as e:
logger.warn("Cannot contact counterparty-server (via get_running_info)")
logger.warn("Cannot contact counterparty-server (via get_running_info): {}".format(e))
time.sleep(3)
continue

Expand Down Expand Up @@ -293,10 +305,6 @@ def parse_block(block_data):
time.sleep(3)
continue

# clean api block cache
if config.state['cp_latest_block_index'] - cur_block_index <= config.MAX_REORG_NUM_BLOCKS: # only when we are near the tip
cache.clean_block_cache(cur_block_index)

try:
result = parse_block(block_data)
except Exception as e: # if anything bubbles up
Expand Down Expand Up @@ -326,6 +334,14 @@ def parse_block(block_data):
% config.MAX_REORG_NUM_BLOCKS)
database.rollback(config.state['cp_latest_block_index'] - config.MAX_REORG_NUM_BLOCKS)
else:
if config.IS_REPARSING:
# restore logging state
root_logger.setLevel(root_logger_level)
# print out how long the reparse took
reparse_end = time.time()
logger.info("Reparse took {:.3f} minutes.".format((reparse_end - reparse_start) / 60.0))
config.IS_REPARSING = False

if config.QUIT_AFTER_CAUGHT_UP:
sys.exit(0)

Expand Down
74 changes: 30 additions & 44 deletions counterblock/lib/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,68 +9,54 @@

from counterblock.lib import config, util

DEFAULT_REDIS_CACHE_PERIOD = 60 # in seconds

logger = logging.getLogger(__name__)
blockinfo_cache = {}
block_info_cache = {}

##
# REDIS-RELATED
##


def get_redis_connection():
logger.info("Connecting to redis @ %s" % config.REDIS_CONNECT)
return redis.StrictRedis(host=config.REDIS_CONNECT, port=config.REDIS_PORT, db=config.REDIS_DATABASE)


def get_value(key):
if not config.REDIS_CLIENT:
logger.debug("Cache MISS: {}".format(key))
return None
result = config.REDIS_CLIENT.get(key)
logger.debug("Cache {}: {}".format('HIT' if result is not None else 'MISS', key))
return json.loads(result.decode('utf8')) if result is not None else result


def set_value(key, value, cache_period=DEFAULT_REDIS_CACHE_PERIOD):
logger.debug("Caching key {} -- period: {}".format(key, cache_period))
if not config.REDIS_CLIENT:
return
config.REDIS_CLIENT.setex(key, cache_period, json.dumps(value))


##
# NOT REDIS RELATED
##


def get_block_info(block_index, prefetch=0, min_message_index=None):
global blockinfo_cache
if block_index in blockinfo_cache:
return blockinfo_cache[block_index]
global block_info_cache
if block_index in block_info_cache:
return block_info_cache[block_index]

blockinfo_cache.clear()
block_info_cache.clear()
blocks = util.call_jsonrpc_api(
'get_blocks',
{'block_indexes': list(range(block_index, block_index + prefetch)),
'min_message_index': min_message_index},
abort_on_error=True)['result']
abort_on_error=True, use_cache=False)['result']
for block in blocks:
blockinfo_cache[block['block_index']] = block
return blockinfo_cache[block_index]


def block_cache(func):
"""decorator"""
def cached_function(*args, **kwargs):
sql = "SELECT block_index FROM blocks ORDER BY block_index DESC LIMIT 1"
block_index = util.call_jsonrpc_api('sql', {'query': sql, 'bindings': []})['result'][0]['block_index']
function_signature = hashlib.sha256((func.__name__ + str(args) + str(kwargs)).encode('utf-8')).hexdigest()

cached_result = config.mongo_db.counterblockd_cache.find_one({'block_index': block_index, 'function': function_signature})

if not cached_result or config.TESTNET:
# logger.info("generate cache ({}, {}, {})".format(func.__name__, block_index, function_signature))
try:
result = func(*args, **kwargs)
config.mongo_db.counterblockd_cache.insert({
'block_index': block_index,
'function': function_signature,
'result': json.dumps(result)
})
return result
except Exception as e:
logger.exception(e)
else:
# logger.info("result from cache ({}, {}, {})".format(func.__name__, block_index, function_signature))
result = json.loads(cached_result['result'])
return result

return cached_function
block_info_cache[block['block_index']] = block
return block_info_cache[block_index]


def clean_block_cache(block_index):
# logger.info("clean block cache lower than {}".format(block_index))
config.mongo_db.counterblockd_cache.remove({'block_index': {'$lt': block_index}})
def clear_block_info_cache():
global block_info_cache
block_info_cache.clear()
20 changes: 11 additions & 9 deletions counterblock/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
##
VERSION = "1.4.0" # should keep up with counterblockd repo's release tag

DB_VERSION = 23 # a db version increment will cause counterblockd to rebuild its database off of counterpartyd
DB_VERSION = 24 # a db version increment will cause counterblockd to rebuild its database off of counterpartyd

UNIT = 100000000

Expand All @@ -28,8 +28,8 @@
MAX_REORG_NUM_BLOCKS = 10 # max reorg we'd likely ever see
MAX_FORCED_REORG_NUM_BLOCKS = 20 # but let us go deeper when messages are out of sync

QUOTE_ASSETS = ['BTC', 'XBTC', 'XCP'] # define the priority for quote asset
MARKET_LIST_QUOTE_ASSETS = ['XCP', 'XBTC', 'BTC'] # define the order in the market list
QUOTE_ASSETS = ['BTC', 'XBTC', 'XCP', 'PEPECASH', 'BITCRYSTALS'] # define the priority for quote asset
MARKET_LIST_QUOTE_ASSETS = ['BITCRYSTALS', 'PEPECASH', 'XCP', 'XBTC', 'BTC'] # define the order in the market list

DEFAULT_BACKEND_PORT_TESTNET = 18332
DEFAULT_BACKEND_PORT = 8332
Expand Down Expand Up @@ -83,8 +83,11 @@ def init_base(args):
global LATEST_BLOCK_INIT
LATEST_BLOCK_INIT = {'block_index': BLOCK_FIRST, 'block_time': None, 'block_hash': None}

# init variables used for reparse operations
global IS_REPARSING
IS_REPARSING = False
global QUIT_AFTER_CAUGHT_UP
QUIT_AFTER_CAUGHT_UP = False # used for reparse operations
QUIT_AFTER_CAUGHT_UP = False

##############
# THINGS WE CONNECT TO
Expand Down Expand Up @@ -229,11 +232,10 @@ def init_base(args):
except:
raise Exception("Please specify a valid redis-database configuration parameter (between 0 and 16 inclusive)")

global REDIS_ENABLE_APICACHE
if args.redis_enable_apicache:
REDIS_ENABLE_APICACHE = args.redis_enable_apicache
else:
REDIS_ENABLE_APICACHE = False
global BLOCKTRAIL_API_KEY
BLOCKTRAIL_API_KEY = args.blocktrail_api_key or None
global BLOCKTRAIL_API_SECRET
BLOCKTRAIL_API_SECRET = args.blocktrail_api_secret or None

##############
# THINGS WE SERVE
Expand Down
5 changes: 3 additions & 2 deletions counterblock/lib/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ def reset_db_state():
return app_config


def reparse(quit_after=False):
def init_reparse(quit_after=False):
app_config = reset_db_state()
config.state['my_latest_block'] = config.LATEST_BLOCK_INIT

config.IS_REPARSING = True
if quit_after:
config.QUIT_AFTER_CAUGHT_UP = True

Expand All @@ -108,7 +109,7 @@ def rollback(max_block_index):

config.state['last_message_index'] = -1
config.state['caught_up'] = False
cache.blockinfo_cache.clear()
cache.clear_block_info_cache()
config.state['my_latest_block'] = config.mongo_db.processed_blocks.find_one({"block_index": max_block_index}) or config.LATEST_BLOCK_INIT

# call any rollback processors for any extension modules
Expand Down
24 changes: 8 additions & 16 deletions counterblock/lib/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ def decorate_message(message, for_txn_history=False):
if message['_category'] in ['bet_expirations', 'order_expirations', 'bet_match_expirations', 'order_match_expirations']:
message['_tx_index'] = 0 # add tx_index to all entries (so we can sort on it secondarily in history view), since these lack it

# include asset extended information (longname and divisible)
for attr in ('asset', 'get_asset', 'give_asset', 'forward_asset', 'backward_asset', 'dividend_asset'):
if attr not in message:
continue
asset_info = config.mongo_db.tracked_assets.find_one({'asset': message[attr]})
message['_{}_longname'.format(attr)] = asset_info['asset_longname'] if asset_info else None
message['_{}_divisible'.format(attr)] = asset_info['divisible'] if asset_info else None

if message['_category'] in ['credits', 'debits']:
# find the last balance change on record
bal_change = config.mongo_db.balance_changes.find_one(
Expand All @@ -30,28 +38,12 @@ def decorate_message(message, for_txn_history=False):
message['_balance'] = bal_change['new_balance'] if bal_change else None
message['_balance_normalized'] = bal_change['new_balance_normalized'] if bal_change else None

if message['_category'] in ['orders', ] and message['_command'] == 'insert':
get_asset_info = config.mongo_db.tracked_assets.find_one({'asset': message['get_asset']})
give_asset_info = config.mongo_db.tracked_assets.find_one({'asset': message['give_asset']})
message['_get_asset_divisible'] = get_asset_info['divisible'] if get_asset_info else None
message['_give_asset_divisible'] = give_asset_info['divisible'] if give_asset_info else None

if message['_category'] in ['order_matches', ] and message['_command'] == 'insert':
forward_asset_info = config.mongo_db.tracked_assets.find_one({'asset': message['forward_asset']})
backward_asset_info = config.mongo_db.tracked_assets.find_one({'asset': message['backward_asset']})
message['_forward_asset_divisible'] = forward_asset_info['divisible'] if forward_asset_info else None
message['_backward_asset_divisible'] = backward_asset_info['divisible'] if backward_asset_info else None

if message['_category'] in ['orders', 'order_matches', ]:
message['_btc_below_dust_limit'] = (
('forward_asset' in message and message['forward_asset'] == config.BTC and message['forward_quantity'] <= config.ORDER_BTC_DUST_LIMIT_CUTOFF)
or ('backward_asset' in message and message['backward_asset'] == config.BTC and message['backward_quantity'] <= config.ORDER_BTC_DUST_LIMIT_CUTOFF)
)

if message['_category'] in ['dividends', 'sends', ]:
asset_info = config.mongo_db.tracked_assets.find_one({'asset': message['asset']})
message['_divisible'] = asset_info['divisible'] if asset_info else None

if message['_category'] in ['issuances', ]:
message['_quantity_normalized'] = blockchain.normalize_quantity(message['quantity'], message['divisible'])
return message
Expand Down
2 changes: 1 addition & 1 deletion counterblock/lib/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def get_mod_params_dict(params):
if params['enabled'] is True:
load_module(module)
except Exception as e:
logger.warn("Failed to load Module %s. Reason: %s" % (module, e))
raise Exception("Failed to load Module %s. Reason: %s" % (module, e))
elif 'Processor' in key:
try:
processor_functions = processor.__dict__[key]
Expand Down
Loading