diff --git a/.gitignore b/.gitignore index 77a4b16..2956e43 100644 --- a/.gitignore +++ b/.gitignore @@ -48,3 +48,8 @@ coverage.xml .venv venv/ ENV/ + +.idea/ + +*.json +*.txt \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index c1e5ae9..54ddec5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,17 @@ FROM python:3.6-alpine + MAINTAINER Omidiora Samuel + ENV PROJECT_DIR=bitcoin-etl RUN mkdir /$PROJECT_DIR + WORKDIR /$PROJECT_DIR + COPY . . + RUN apk add --no-cache gcc musl-dev #for C libraries: -RUN pip install --upgrade pip && pip install -e /$PROJECT_DIR/ + +RUN pip install --upgrade pip && pip install -e /$PROJECT_DIR/ && pip install dist/bitcoin-etl-1.5.2.tar.gz && pip install kafka-python==2.0.2 ENTRYPOINT ["python", "bitcoinetl"] \ No newline at end of file diff --git a/Dockerfile_with_streaming b/Dockerfile_with_streaming index 6390558..cfdd908 100644 --- a/Dockerfile_with_streaming +++ b/Dockerfile_with_streaming @@ -1,15 +1,22 @@ FROM python:3.6 + MAINTAINER Evgeny Medvedev + ENV PROJECT_DIR=bitcoin-etl RUN mkdir /$PROJECT_DIR + WORKDIR /$PROJECT_DIR + COPY . . -RUN pip install --upgrade pip && pip install -e /$PROJECT_DIR/[streaming] + +RUN pip install --upgrade pip && pip install dist/bitcoin-etl-1.5.2.tar.gz && pip install -e /$PROJECT_DIR/[streaming] # Add Tini ENV TINI_VERSION v0.18.0 + ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini + RUN chmod +x /tini ENTRYPOINT ["/tini", "--", "python", "bitcoinetl"] diff --git a/README_CONTAINER_REGISTRY.md b/README_CONTAINER_REGISTRY.md new file mode 100644 index 0000000..da49514 --- /dev/null +++ b/README_CONTAINER_REGISTRY.md @@ -0,0 +1,5 @@ +BITCOINETL_STREAMING_VERSION=1.4-streaming + docker build -t merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} -f Dockerfile_with_streaming . + docker tag merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} us.gcr.io/staging-btc-etl/merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} + docker push us.gcr.io/staging-btc-etl/merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} + diff --git a/bitcoinetl/cli/export_all.py b/bitcoinetl/cli/export_all.py index e235f9a..9ef1a7b 100644 --- a/bitcoinetl/cli/export_all.py +++ b/bitcoinetl/cli/export_all.py @@ -25,7 +25,7 @@ import re from datetime import datetime, timedelta -from bitcoinetl.enumeration.chain import Chain +from bitcoinetl.enumeration.chain import Chain, CoinPriceType from bitcoinetl.jobs.export_all import export_all as do_export_all from bitcoinetl.service.btc_block_range_service import BtcBlockRangeService from bitcoinetl.rpc.bitcoin_rpc import BitcoinRpc @@ -99,4 +99,4 @@ def get_partitions(start, end, partition_batch_size, provider_uri): def export_all(start, end, partition_batch_size, provider_uri, output_dir, max_workers, export_batch_size, chain, enrich): """Exports all data for a range of blocks.""" do_export_all(chain, get_partitions(start, end, partition_batch_size, provider_uri), - output_dir, provider_uri, max_workers, export_batch_size, enrich) + output_dir, provider_uri, max_workers, export_batch_size, enrich) diff --git a/bitcoinetl/cli/export_blocks_and_transactions.py b/bitcoinetl/cli/export_blocks_and_transactions.py index 1ab1a09..6c918dc 100644 --- a/bitcoinetl/cli/export_blocks_and_transactions.py +++ b/bitcoinetl/cli/export_blocks_and_transactions.py @@ -23,7 +23,7 @@ import click -from bitcoinetl.enumeration.chain import Chain +from bitcoinetl.enumeration.chain import Chain, CoinPriceType from bitcoinetl.jobs.export_blocks_job import ExportBlocksJob from bitcoinetl.jobs.exporters.blocks_and_transactions_item_exporter import blocks_and_transactions_item_exporter from bitcoinetl.rpc.bitcoin_rpc import BitcoinRpc @@ -49,7 +49,7 @@ @click.option('-c', '--chain', default=Chain.BITCOIN, type=click.Choice(Chain.ALL), help='The type of chain') def export_blocks_and_transactions(start_block, end_block, batch_size, provider_uri, - max_workers, blocks_output, transactions_output, chain): + max_workers, blocks_output, transactions_output, chain,): """Export blocks and transactions.""" if blocks_output is None and transactions_output is None: raise ValueError('Either --blocks-output or --transactions-output options must be provided') @@ -63,5 +63,6 @@ def export_blocks_and_transactions(start_block, end_block, batch_size, provider_ item_exporter=blocks_and_transactions_item_exporter(blocks_output, transactions_output), chain=chain, export_blocks=blocks_output is not None, - export_transactions=transactions_output is not None) + export_transactions=transactions_output is not None, + ) job.run() diff --git a/bitcoinetl/cli/stream.py b/bitcoinetl/cli/stream.py index 46170df..82c36be 100644 --- a/bitcoinetl/cli/stream.py +++ b/bitcoinetl/cli/stream.py @@ -21,7 +21,7 @@ # SOFTWARE. import click - +import json from bitcoinetl.enumeration.chain import Chain from bitcoinetl.rpc.bitcoin_rpc import BitcoinRpc @@ -41,16 +41,17 @@ @click.option('-o', '--output', type=str, help='Google PubSub topic path e.g. projects/your-project/topics/bitcoin_blockchain. ' 'If not specified will print to console.') +@click.option('--topic-mapping', default=None, type=str, help="Topic Mapping should be json like {\"block\": \"producer-litcoin-blocks-hot\",\"transaction\": \"producer-litcoin-transactions-hot\"}") @click.option('-s', '--start-block', default=None, type=int, help='Start block.') @click.option('-c', '--chain', default=Chain.BITCOIN, type=click.Choice(Chain.ALL), help='The type of chain.') -@click.option('--period-seconds', default=10, type=int, help='How many seconds to sleep between syncs.') -@click.option('-b', '--batch-size', default=2, type=int, help='How many blocks to batch in single request.') +@click.option('--period-seconds', default=1, type=int, help='How many seconds to sleep between syncs.') +@click.option('-b', '--batch-size', default=1, type=int, help='How many blocks to batch in single request.') @click.option('-B', '--block-batch-size', default=10, type=int, help='How many blocks to batch in single sync round.') @click.option('-w', '--max-workers', default=5, type=int, help='The number of workers.') @click.option('--log-file', default=None, type=str, help='Log file.') @click.option('--pid-file', default=None, type=str, help='pid file.') @click.option('--enrich', default=True, type=bool, help='Enable filling in transactions inputs fields.') -def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain=Chain.BITCOIN, +def stream(last_synced_block_file, lag, provider_uri, output, topic_mapping, start_block, chain=Chain.BITCOIN, period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None, enrich=True): """Streams all data types to console or Google Pub/Sub.""" @@ -61,13 +62,16 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain from bitcoinetl.streaming.btc_streamer_adapter import BtcStreamerAdapter from blockchainetl.streaming.streamer import Streamer + if (topic_mapping is not None): + topic_mapping = json.loads(topic_mapping) + streamer_adapter = BtcStreamerAdapter( bitcoin_rpc=ThreadLocalProxy(lambda: BitcoinRpc(provider_uri)), - item_exporter=get_item_exporter(output), + item_exporter=get_item_exporter(output,topic_mapping,chain), chain=chain, batch_size=batch_size, enable_enrich=enrich, - max_workers=max_workers + max_workers=max_workers, ) streamer = Streamer( blockchain_streamer_adapter=streamer_adapter, @@ -77,5 +81,6 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain period_seconds=period_seconds, block_batch_size=block_batch_size, pid_file=pid_file, + retry_errors=retry_errors ) streamer.stream() diff --git a/bitcoinetl/domain/block.py b/bitcoinetl/domain/block.py index 97a8219..dab89f2 100644 --- a/bitcoinetl/domain/block.py +++ b/bitcoinetl/domain/block.py @@ -24,6 +24,7 @@ class BtcBlock(object): + def __init__(self): self.hash = None self.size = None @@ -36,8 +37,25 @@ def __init__(self): self.nonce = None self.bits = None self.coinbase_param = None + self.transaction_count = None self.transactions = [] + # New fields added + self.transaction_ids = [] + + self.version_hex = None + self.median_timestamp = None + self.difficulty = None + self.chain_work = None + self.previous_block_hash = None + self.next_block_hash = None + self.input_value = None + + self.block_reward = None + self.transaction_fees = None + self.coinbase_txid = None + self.coinbase_param_decoded = None + def has_full_transactions(self): return len(self.transactions) > 0 and isinstance(self.transactions[0], BtcTransaction) diff --git a/bitcoinetl/domain/transaction.py b/bitcoinetl/domain/transaction.py index ef56275..6ae4b75 100644 --- a/bitcoinetl/domain/transaction.py +++ b/bitcoinetl/domain/transaction.py @@ -43,6 +43,14 @@ def __init__(self): self.join_splits = [] self.value_balance = 0 + # New fields + self.transaction_id = None + self.weight = None + self.input_count = None + self.input_value = None + self.output_count = None + self.output_value = None + def add_input(self, input): if len(self.inputs) > 0: input.index = self.inputs[len(self.inputs) - 1].index + 1 diff --git a/bitcoinetl/domain/transaction_input.py b/bitcoinetl/domain/transaction_input.py index 95a3bf6..d2a39d8 100644 --- a/bitcoinetl/domain/transaction_input.py +++ b/bitcoinetl/domain/transaction_input.py @@ -23,9 +23,12 @@ class BtcTransactionInput(object): def __init__(self): + self.create_transaction_id = None + self.create_output_index = None + + self.spending_transaction_id = None self.index = None - self.spent_transaction_hash = None - self.spent_output_index = None + self.script_asm = None self.script_hex = None self.coinbase_param = None @@ -37,4 +40,4 @@ def __init__(self): self.value = None def is_coinbase(self): - return self.coinbase_param is not None or self.spent_transaction_hash is None + return self.coinbase_param is not None or self.create_transaction_id is None diff --git a/bitcoinetl/domain/transaction_output.py b/bitcoinetl/domain/transaction_output.py index 2d3b2ca..f5c53bf 100644 --- a/bitcoinetl/domain/transaction_output.py +++ b/bitcoinetl/domain/transaction_output.py @@ -31,3 +31,7 @@ def __init__(self): self.addresses = [] self.value = None + self.witness = [] + + self.create_transaction_id = None + self.spending_transaction_id = None diff --git a/bitcoinetl/enumeration/chain.py b/bitcoinetl/enumeration/chain.py index 3a53105..e59e266 100644 --- a/bitcoinetl/enumeration/chain.py +++ b/bitcoinetl/enumeration/chain.py @@ -1,6 +1,7 @@ class Chain: BITCOIN = 'bitcoin' BITCOIN_CASH = 'bitcoin_cash' + BITCOIN_CASH_SV = 'bitcoin_cash_sv' BITCOIN_GOLD = 'bitcoin_gold' DOGECOIN = 'dogecoin' LITECOIN = 'litecoin' @@ -8,6 +9,27 @@ class Chain: ZCASH = 'zcash' MONACOIN = 'monacoin' - ALL = [BITCOIN, BITCOIN_CASH, BITCOIN_GOLD, DOGECOIN, LITECOIN, DASH, ZCASH, MONACOIN] + ALL = [BITCOIN, BITCOIN_CASH, BITCOIN_CASH_SV, BITCOIN_GOLD, DOGECOIN, LITECOIN, DASH, ZCASH, MONACOIN] # Old API doesn't support verbosity for getblock which doesn't allow querying all transactions in a block in 1 go. HAVE_OLD_API = [BITCOIN_CASH, DOGECOIN, DASH, MONACOIN] + + @classmethod + def ticker_symbol(cls, chain): + symbols = { + 'bitcoin': 'BTC', + 'bitcoin_cash': 'BCH', + 'bitcoin_cash_sv': 'BSV', + 'dogecoin': 'DOGE', + 'litecoin': 'LTC', + 'dash': 'DASH', + 'zcash': 'ZEC', + 'monacoin': 'MONA', + } + return symbols.get(chain, None) + + +class CoinPriceType: + + empty = 0 + daily = 1 + hourly = 2 diff --git a/bitcoinetl/jobs/enrich_transactions.py b/bitcoinetl/jobs/enrich_transactions.py index a100f84..dec7dd7 100644 --- a/bitcoinetl/jobs/enrich_transactions.py +++ b/bitcoinetl/jobs/enrich_transactions.py @@ -63,7 +63,7 @@ def _enrich_transactions(self, transactions): input_transactions_map = self._get_input_transactions_as_map(transaction_input_batch) for input in transaction_input_batch: output = self._get_output_for_input(input, input_transactions_map) \ - if input.spent_transaction_hash is not None else None + if input.create_transaction_id is not None else None if output is not None: input.required_signatures = output.required_signatures input.type = output.type @@ -74,29 +74,29 @@ def _enrich_transactions(self, transactions): self.item_exporter.export_item(self.transaction_mapper.transaction_to_dict(transaction)) def _get_input_transactions_as_map(self, transaction_inputs): - transaction_hashes = [input.spent_transaction_hash for input in transaction_inputs - if input.spent_transaction_hash is not None] + transaction_hashes = [input.create_transaction_id for input in transaction_inputs + if input.create_transaction_id is not None] transaction_hashes = set(transaction_hashes) if len(transaction_hashes) > 0: transactions = self.btc_service.get_transactions_by_hashes(transaction_hashes) - return {transaction.hash: transaction for transaction in transactions} + return {transaction.transaction_id: transaction for transaction in transactions} else: return {} def _get_output_for_input(self, transaction_input, input_transactions_map): - spent_transaction_hash = transaction_input.spent_transaction_hash - input_transaction = input_transactions_map.get(spent_transaction_hash) + create_transaction_id = transaction_input.create_transaction_id + input_transaction = input_transactions_map.get(create_transaction_id) if input_transaction is None: - raise ValueError('Input transaction with hash {} not found'.format(spent_transaction_hash)) + raise ValueError('Input transaction with hash {} not found'.format(create_transaction_id)) - spent_output_index = transaction_input.spent_output_index - if input_transaction.outputs is None or len(input_transaction.outputs) < (spent_output_index + 1): + create_output_index = transaction_input.create_output_index + if input_transaction.outputs is None or len(input_transaction.outputs) < (create_output_index + 1): raise ValueError( 'There is no output with index {} in transaction with hash {}'.format( - spent_output_index, spent_transaction_hash)) + create_output_index, create_transaction_id)) - output = input_transaction.outputs[spent_output_index] + output = input_transaction.outputs[create_output_index] return output def _end(self): diff --git a/bitcoinetl/jobs/export_all.py b/bitcoinetl/jobs/export_all.py index 8a33591..c86e94d 100644 --- a/bitcoinetl/jobs/export_all.py +++ b/bitcoinetl/jobs/export_all.py @@ -40,7 +40,9 @@ logger = logging.getLogger('export_all') -def export_all(chain, partitions, output_dir, provider_uri, max_workers, batch_size, enrich): +def export_all( + chain, partitions, output_dir, provider_uri, max_workers, batch_size, enrich, + ): for batch_start_block, batch_end_block, partition_dir, *args in partitions: # # # start # # # @@ -101,7 +103,8 @@ def export_all(chain, partitions, output_dir, provider_uri, max_workers, batch_s max_workers=max_workers, item_exporter=blocks_and_transactions_item_exporter(blocks_file, transactions_file), export_blocks=blocks_file is not None, - export_transactions=transactions_file is not None) + export_transactions=transactions_file is not None, + ) job.run() if enrich == True: diff --git a/bitcoinetl/jobs/export_blocks_job.py b/bitcoinetl/jobs/export_blocks_job.py index c4b384c..bbb7d98 100644 --- a/bitcoinetl/jobs/export_blocks_job.py +++ b/bitcoinetl/jobs/export_blocks_job.py @@ -27,6 +27,7 @@ from blockchainetl.executors.batch_work_executor import BatchWorkExecutor from blockchainetl.jobs.base_job import BaseJob from blockchainetl.utils import validate_range +from bitcoinetl.enumeration.chain import CoinPriceType # Exports blocks and transactions @@ -43,6 +44,7 @@ def __init__( export_blocks=True, export_transactions=True): validate_range(start_block, end_block) + self.start_block = start_block self.end_block = end_block diff --git a/bitcoinetl/jobs/exporters/blocks_and_transactions_item_exporter.py b/bitcoinetl/jobs/exporters/blocks_and_transactions_item_exporter.py index c1e8f0e..d8e1773 100644 --- a/bitcoinetl/jobs/exporters/blocks_and_transactions_item_exporter.py +++ b/bitcoinetl/jobs/exporters/blocks_and_transactions_item_exporter.py @@ -23,41 +23,51 @@ from blockchainetl.jobs.exporters.composite_item_exporter import CompositeItemExporter + BLOCK_FIELDS_TO_EXPORT = [ - 'hash', - 'size', - 'stripped_size', - 'weight', - 'number', - 'version', - 'merkle_root', - 'timestamp', - 'nonce', - 'bits', - 'coinbase_param', - 'transaction_count' + "hash", + "number", + "timestamp", + "median_timestamp", + "merkle_root", + "coinbase_param", + "coinbase_param_decoded", + "coinbase_txid", + "previous_block_hash", + "nonce", + "difficulty", + "chain_work", + "version", + "version_hex", + "size", + "stripped_size", + "weight", + "bits", + "transaction_count", + "block_reward", + "transaction_ids", ] + TRANSACTION_FIELDS_TO_EXPORT = [ + 'transaction_id', 'hash', - 'size', - 'virtual_size', - 'version', - 'lock_time', 'block_number', 'block_hash', 'block_timestamp', 'is_coinbase', + 'lock_time', + 'size', + 'virtual_size', + 'weight', + 'version', 'index', - - 'inputs', - 'outputs', - 'input_count', 'output_count', 'input_value', 'output_value', - 'fee' + 'inputs', + 'outputs', ] diff --git a/bitcoinetl/mappers/block_mapper.py b/bitcoinetl/mappers/block_mapper.py index fce6095..59bcabf 100644 --- a/bitcoinetl/mappers/block_mapper.py +++ b/bitcoinetl/mappers/block_mapper.py @@ -52,12 +52,22 @@ def json_dict_to_block(self, json_dict): block.transactions = [ self.transaction_mapper.json_dict_to_transaction(tx, block, idx) for idx, tx in enumerate(raw_transactions) ] + block.transaction_ids = [tx.transaction_id for tx in block.transactions] else: # Transaction hashes block.transactions = raw_transactions + block.transaction_ids = raw_transactions block.transaction_count = len(raw_transactions) + # New fields + block.transaction_count = json_dict.get("nTx") + block.version_hex = json_dict.get("versionHex") + block.median_timestamp = json_dict.get("mediantime") + block.difficulty = int(json_dict.get("difficulty")) + block.chain_work = json_dict.get("chainwork") + block.coinbase_txid = json_dict.get("coinbase_txid") + block.previous_block_hash = json_dict.get("previousblockhash") return block def block_to_dict(self, block): @@ -74,7 +84,16 @@ def block_to_dict(self, block): 'nonce': block.nonce, 'bits': block.bits, 'coinbase_param': block.coinbase_param, - 'transaction_count': len(block.transactions) + 'coinbase_param_decoded': block.coinbase_param_decoded, + 'coinbase_txid': block.coinbase_txid, + 'transaction_count': block.transaction_count, + 'block_reward': block.block_reward, + 'version_hex': block.version_hex, + 'median_timestamp': block.median_timestamp, + 'difficulty': block.difficulty, + 'chain_work': block.chain_work, + 'previous_block_hash': block.previous_block_hash, + "transaction_ids": block.transaction_ids } diff --git a/bitcoinetl/mappers/transaction_input_mapper.py b/bitcoinetl/mappers/transaction_input_mapper.py index 9d58058..69fe0f6 100644 --- a/bitcoinetl/mappers/transaction_input_mapper.py +++ b/bitcoinetl/mappers/transaction_input_mapper.py @@ -25,27 +25,31 @@ class BtcTransactionInputMapper(object): - def vin_to_inputs(self, vin): + def vin_to_inputs(self, vin, spending_transaction_id=None): inputs = [] index = 0 for item in (vin or []): - input = self.json_dict_to_input(item) + input = self.json_dict_to_input(json_dict=item, spending_transaction_id=spending_transaction_id) input.index = index index = index + 1 inputs.append(input) return inputs - def json_dict_to_input(self, json_dict): + def json_dict_to_input(self, json_dict, spending_transaction_id=None): input = BtcTransactionInput() - input.spent_transaction_hash = json_dict.get('txid') - input.spent_output_index = json_dict.get('vout') + input.create_transaction_id = json_dict.get('txid') + input.create_output_index = json_dict.get('vout') + + input.spending_transaction_id = spending_transaction_id + input.coinbase_param = json_dict.get('coinbase') input.sequence = json_dict.get('sequence') + if 'scriptSig' in json_dict: - input.script_asm = (json_dict.get('scriptSig')).get('asm') - input.script_hex = (json_dict.get('scriptSig')).get('hex') + input.script_asm = '' #(json_dict.get('scriptSig')).get('asm') + input.script_hex = '' #(json_dict.get('scriptSig')).get('hex') return input @@ -54,16 +58,21 @@ def inputs_to_dicts(self, inputs): for input in inputs: item = { 'index': input.index, - 'spent_transaction_hash': input.spent_transaction_hash, - 'spent_output_index': input.spent_output_index, - 'script_asm': input.script_asm, - 'script_hex': input.script_hex, + 'create_transaction_id': input.create_transaction_id, + 'spending_transaction_id': input.spending_transaction_id, + 'create_output_index': input.create_output_index, 'sequence': input.sequence, + + 'script_asm': '', #input.script_asm + 'script_hex': '', #input.script_hex + 'required_signatures': input.required_signatures, - 'type': input.type, 'addresses': input.addresses, 'value': input.value, + 'type': input.type, } + if input.coinbase_param: + item['coinbase_param'] = input.coinbase_param result.append(item) return result @@ -72,15 +81,16 @@ def dicts_to_inputs(self, dicts): for dict in dicts: input = BtcTransactionInput() input.index = dict.get('index') - input.spent_transaction_hash = dict.get('spent_transaction_hash') - input.spent_output_index = dict.get('spent_output_index') - input.script_asm = dict.get('script_asm') - input.script_hex = dict.get('script_hex') + input.create_transaction_id = dict.get('create_transaction_id') + input.create_output_index = dict.get('create_output_index') + input.script_asm = '' #dict.get('script_asm') + input.script_hex = '' #dict.get('script_hex') input.sequence = dict.get('sequence') input.required_signatures = dict.get('required_signatures') input.type = dict.get('type') input.addresses = dict.get('addresses') input.value = dict.get('value') + input.spending_transaction_id = dict.get('spending_transaction_id') result.append(input) return result diff --git a/bitcoinetl/mappers/transaction_mapper.py b/bitcoinetl/mappers/transaction_mapper.py index 93f2f29..14899ba 100644 --- a/bitcoinetl/mappers/transaction_mapper.py +++ b/bitcoinetl/mappers/transaction_mapper.py @@ -38,11 +38,12 @@ def __init__(self): def json_dict_to_transaction(self, json_dict, block=None, index=None): transaction = BtcTransaction() - transaction.hash = json_dict.get('txid') + transaction.hash = json_dict.get('hash') transaction.size = json_dict.get('size') transaction.virtual_size = json_dict.get('vsize') transaction.version = json_dict.get('version') transaction.lock_time = json_dict.get('locktime') + transaction.transaction_id = json_dict.get('txid') if block is not None: transaction.block_number = block.number @@ -58,19 +59,31 @@ def json_dict_to_transaction(self, json_dict, block=None, index=None): if index is not None: transaction.index = index - transaction.inputs = self.transaction_input_mapper.vin_to_inputs(json_dict.get('vin')) - transaction.outputs = self.transaction_output_mapper.vout_to_outputs(json_dict.get('vout')) + transaction.inputs = self.transaction_input_mapper.vin_to_inputs( + vin=json_dict.get('vin'), + spending_transaction_id=transaction.transaction_id + ) + transaction.outputs = self.transaction_output_mapper.vout_to_outputs( + vout=json_dict.get('vout'), + create_transaction_id=transaction.transaction_id + ) # Only Zcash transaction.join_splits = self.join_split_mapper.vjoinsplit_to_join_splits(json_dict.get('vjoinsplit')) transaction.value_balance = bitcoin_to_satoshi(json_dict.get('valueBalance')) + transaction.weight = json_dict.get('weight') + transaction.output_addresses = self.get_output_addresses(transaction) return transaction + def get_output_addresses(self, transaction): + return [','.join(output.addresses) if output.addresses else output.addresses for output in transaction.outputs] + def transaction_to_dict(self, transaction): result = { 'type': 'transaction', 'hash': transaction.hash, + 'transaction_id': transaction.transaction_id, 'size': transaction.size, 'virtual_size': transaction.virtual_size, 'version': transaction.version, @@ -89,12 +102,15 @@ def transaction_to_dict(self, transaction): 'input_value': transaction.calculate_input_value(), 'output_value': transaction.calculate_output_value(), 'fee': transaction.calculate_fee(), + 'weight': transaction.weight, + 'output_addresses': transaction.output_addresses } return result def dict_to_transaction(self, dict): transaction = BtcTransaction() transaction.hash = dict.get('hash') + transaction.transaction_id = dict.get('transaction_id') transaction.size = dict.get('size') transaction.virtual_size = dict.get('virtual_size') transaction.version = dict.get('version') @@ -104,6 +120,14 @@ def dict_to_transaction(self, dict): transaction.block_timestamp = dict.get('block_timestamp') transaction.is_coinbase = dict.get('is_coinbase') transaction.index = dict.get('index') + transaction.weight = dict.get('weight') + transaction.output_addresses = dict.get('output_addresses') + transaction.input_addresses = dict.get('input_addresses') + transaction.input_count = dict.get('input_count') + transaction.input_value = dict.get('input_value') + transaction.output_count = dict.get('output_count') + transaction.output_value = dict.get('output_value') + transaction.fee = dict.get('fee') transaction.inputs = self.transaction_input_mapper.dicts_to_inputs(dict.get('inputs')) transaction.outputs = self.transaction_output_mapper.dicts_to_outputs(dict.get('outputs')) diff --git a/bitcoinetl/mappers/transaction_output_mapper.py b/bitcoinetl/mappers/transaction_output_mapper.py index 697f6ef..edd0361 100644 --- a/bitcoinetl/mappers/transaction_output_mapper.py +++ b/bitcoinetl/mappers/transaction_output_mapper.py @@ -26,26 +26,29 @@ class BtcTransactionOutputMapper(object): - def vout_to_outputs(self, vout): + def vout_to_outputs(self, vout, create_transaction_id=None): outputs = [] for item in (vout or []): - output = self.json_dict_to_output(item) + output = self.json_dict_to_output(json_dict=item, create_transaction_id=create_transaction_id) outputs.append(output) return outputs - def json_dict_to_output(self, json_dict): + def json_dict_to_output(self, json_dict, create_transaction_id=None): output = BtcTransactionOutput() output.index = json_dict.get('n') output.addresses = json_dict.get('addresses') - output.txinwitness = json_dict.get('txinwitness') + output.witness = json_dict.get('txinwitness') output.value = bitcoin_to_satoshi(json_dict.get('value')) + output.create_transaction_id = create_transaction_id + if 'scriptPubKey' in json_dict: script_pub_key = json_dict.get('scriptPubKey') - output.script_asm = script_pub_key.get('asm') - output.script_hex = script_pub_key.get('hex') + output.script_asm = '' #script_pub_key.get('asm') + output.script_hex = '' #script_pub_key.get('hex') output.required_signatures = script_pub_key.get('reqSigs') output.type = script_pub_key.get('type') + #output.addresses = script_pub_key.get('addresses') if script_pub_key.get('addresses') is not None and len(script_pub_key.get('addresses')) > 0: output.addresses = script_pub_key.get('addresses') elif script_pub_key.get('address') is None: @@ -60,13 +63,20 @@ def outputs_to_dicts(self, outputs): for output in outputs: item = { 'index': output.index, - 'script_asm': output.script_asm, - 'script_hex': output.script_hex, - 'required_signatures': output.required_signatures, + 'create_transaction_id': output.create_transaction_id, + 'spending_transaction_id': None, + + 'script_asm': '', #output.script_asm + 'script_hex': '', #output.script_hex + 'type': output.type, 'addresses': output.addresses, - 'value': output.value + 'value': output.value, + 'required_signatures': output.required_signatures, } + if output.witness: + item['witness'] = output.witness + result.append(item) return result @@ -75,12 +85,15 @@ def dicts_to_outputs(self, dicts): for dict in dicts: input = BtcTransactionOutput() input.index = dict.get('index') - input.script_asm = dict.get('script_asm') - input.script_hex = dict.get('script_hex') + input.script_asm = '' #dict.get('script_asm') + input.script_hex = '' #dict.get('script_hex') input.required_signatures = dict.get('required_signatures') input.type = dict.get('type') input.addresses = dict.get('addresses') input.value = dict.get('value') + input.witness = dict.get('witness') + input.create_transaction_id = dict.get('create_transaction_id') + input.spending_transaction_id = dict.get('spending_transaction_id') result.append(input) return result diff --git a/bitcoinetl/rpc/bitcoin_rpc.py b/bitcoinetl/rpc/bitcoin_rpc.py index 498b64f..a9828c4 100644 --- a/bitcoinetl/rpc/bitcoin_rpc.py +++ b/bitcoinetl/rpc/bitcoin_rpc.py @@ -28,7 +28,7 @@ class BitcoinRpc: - def __init__(self, provider_uri, timeout=60): + def __init__(self, provider_uri, timeout=180): self.provider_uri = provider_uri self.timeout = timeout diff --git a/bitcoinetl/service/btc_service.py b/bitcoinetl/service/btc_service.py index d768618..e59b1dd 100644 --- a/bitcoinetl/service/btc_service.py +++ b/bitcoinetl/service/btc_service.py @@ -22,7 +22,7 @@ from bitcoinetl.domain.transaction_input import BtcTransactionInput from bitcoinetl.domain.transaction_output import BtcTransactionOutput -from bitcoinetl.enumeration.chain import Chain +from bitcoinetl.enumeration.chain import Chain, CoinPriceType from bitcoinetl.json_rpc_requests import generate_get_block_hash_by_number_json_rpc, \ generate_get_block_by_hash_json_rpc, generate_get_transaction_by_id_json_rpc from bitcoinetl.mappers.block_mapper import BtcBlockMapper @@ -38,6 +38,7 @@ def __init__(self, bitcoin_rpc, chain=Chain.BITCOIN): self.block_mapper = BtcBlockMapper() self.transaction_mapper = BtcTransactionMapper() self.chain = chain + self.cached_prices = {} def get_block(self, block_number, with_transactions=False): block_hashes = self.get_block_hashes([block_number]) @@ -75,6 +76,7 @@ def get_blocks_by_hashes(self, block_hash_batch, with_transactions=True): for block in blocks: self._remove_coinbase_input(block) + if block.has_full_transactions(): for transaction in block.transactions: self._add_non_standard_addresses(transaction) @@ -144,6 +146,7 @@ def _remove_coinbase_input(self, block): if block.has_full_transactions(): for transaction in block.transactions: coinbase_inputs = [input for input in transaction.inputs if input.is_coinbase()] + if len(coinbase_inputs) > 1: raise ValueError('There must be no more than 1 coinbase input in any transaction. Was {}, hash {}' .format(len(coinbase_inputs), transaction.hash)) @@ -153,9 +156,19 @@ def _remove_coinbase_input(self, block): transaction.inputs = [input for input in transaction.inputs if not input.is_coinbase()] transaction.is_coinbase = True + block.coinbase_param = coinbase_input.coinbase_param + block.coinbase_param_decoded = bytes.fromhex(coinbase_input.coinbase_param).decode('utf-8', 'replace') + block.coinbase_tx = transaction + block.coinbase_txid = transaction.transaction_id + + block.block_reward = self.get_block_reward(block) + transaction.input_count = 0 + def _add_non_standard_addresses(self, transaction): for output in transaction.outputs: if output.addresses is None or len(output.addresses) == 0: + # output.type = 'nonstandard' + # if output.type != 'multisig': output.type = 'nonstandard' output.addresses = [script_hex_to_non_standard_address(output.script_hex)] @@ -186,5 +199,7 @@ def _add_shielded_inputs_and_outputs(self, transaction): output.value = -transaction.value_balance transaction.add_output(output) + def get_block_reward(self, block): + return block.coinbase_tx.calculate_output_value() ADDRESS_TYPE_SHIELDED = 'shielded' diff --git a/bitcoinetl/streaming/btc_streamer_adapter.py b/bitcoinetl/streaming/btc_streamer_adapter.py index 600d65b..81c9e8d 100644 --- a/bitcoinetl/streaming/btc_streamer_adapter.py +++ b/bitcoinetl/streaming/btc_streamer_adapter.py @@ -95,6 +95,8 @@ def export_all(self, start_block, end_block): transactions = enriched_transactions logging.info('Exporting with ' + type(self.item_exporter).__name__) + logging.info('Block number ' + str(len(blocks))) + logging.info('Transaction length ' + str(len(transactions))) all_items = blocks + transactions diff --git a/bitcoinetl/streaming/streaming_utils.py b/bitcoinetl/streaming/streaming_utils.py index ea6777f..01d73dd 100644 --- a/bitcoinetl/streaming/streaming_utils.py +++ b/bitcoinetl/streaming/streaming_utils.py @@ -1,16 +1,46 @@ from blockchainetl.jobs.exporters.console_item_exporter import ConsoleItemExporter -def get_item_exporter(output): - if output is not None: - from blockchainetl.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter - item_exporter = GooglePubSubItemExporter( - item_type_to_topic_mapping={ - 'block': output + '.blocks', - 'transaction': output + '.transactions' - }, - message_attributes=('item_id',)) - else: - item_exporter = ConsoleItemExporter() +def get_item_exporter(output,topic_mapping,chain): + item_exporter_type = determine_item_exporter_type(output) + if item_exporter_type == ItemExporterType.PUBSUB: + if output is not None: + from blockchainetl.jobs.exporters.google_pubsub_item_exporter import GooglePubSubItemExporter + item_exporter = GooglePubSubItemExporter( + item_type_to_topic_mapping={ + 'block': output + '.blocks', + 'transaction': output + '.transactions' + }, + message_attributes=('item_id',)) + else: + item_exporter = ConsoleItemExporter() + + elif item_exporter_type == ItemExporterType.KAFKA: + from blockchainetl.jobs.exporters.kafka_exporter import KafkaItemExporter + if (topic_mapping is None): + item_exporter = KafkaItemExporter(output, item_type_to_topic_mapping={ + 'block': f"producer-{chain}-blocks-hot", + 'transaction': f"producer-{chain}-transactions-hot", + }) + else: + item_exporter = KafkaItemExporter(output, item_type_to_topic_mapping=topic_mapping) + else: + raise ValueError('Unable to determine item exporter type for output ' + output) + return item_exporter + + +def determine_item_exporter_type(output): + if output is not None and output.startswith('projects'): + return ItemExporterType.PUBSUB + if output is not None and output.startswith('kafka'): + return ItemExporterType.KAFKA + else: + return ItemExporterType.UNKNOWN + + +class ItemExporterType: + PUBSUB = 'pubsub' + KAFKA = 'kafka' + UNKNOWN = 'unknown' \ No newline at end of file diff --git a/blockchainetl/jobs/exporters/bitcoin_flatten.py b/blockchainetl/jobs/exporters/bitcoin_flatten.py new file mode 100644 index 0000000..d685701 --- /dev/null +++ b/blockchainetl/jobs/exporters/bitcoin_flatten.py @@ -0,0 +1,56 @@ +import json +import datetime +from decimal import Decimal + + +def flatten_transformation(payload_dict): + + TYPE_EXTERNAL = 1 + default_token_address = "0x0000" + NULL_ADDRESS_MINT = "Mint" + TYPE_BLOCK_REWARD = 3 + + transformed_transactions = [] + for output in payload_dict["outputs"]: + for input in payload_dict["inputs"]: + if not payload_dict["is_coinbase"]: + if output["value"] > 0: + token_outgoing_value = Decimal((input["value"]) * (output["value"]) / (payload_dict["output_value"])) + else: + token_outgoing_value = Decimal((input["value"]) / payload_dict["output_count"]) + if input["value"] > 0: + token_incoming_value = Decimal((input["value"]) * (output["value"]) / (payload_dict["input_value"])) + else: + token_incoming_value = 0 + + token_outgoing_fee = token_outgoing_value - token_incoming_value + + transformed_transactions.append({ + "block": payload_dict["block_number"], + "transaction_id": payload_dict["hash"], + "transaction_ts": payload_dict["block_timestamp"], + "transaction_type": TYPE_EXTERNAL, + "sender_address": "|".join(input["addresses"]), + "receiver_address": "|".join(output["addresses"]), + "token_outgoing_value": str(float(token_outgoing_value)), + "token_address": default_token_address, + "token_incoming_value": str(float(token_incoming_value)), + "token_outgoing_fee": str(float(token_outgoing_fee)) + }) + else: + transformed_transactions.append({ + "block": payload_dict["block_number"], + "transaction_id": payload_dict["hash"], + "transaction_ts": payload_dict["block_timestamp"], + "transaction_type": TYPE_BLOCK_REWARD, + "sender_address": f"{NULL_ADDRESS_MINT}_{datetime.datetime.fromtimestamp(payload_dict['block_timestamp']).month}", + "receiver_address": "|".join(output["addresses"]), + "token_outgoing_value": str(output["value"]), + "token_incoming_value": str(output["value"]), + "token_address": default_token_address, + "token_outgoing_fee": str(0) + }) + + + return transformed_transactions + \ No newline at end of file diff --git a/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py b/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py index f8caad2..4ff13bd 100644 --- a/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py +++ b/blockchainetl/jobs/exporters/google_pubsub_item_exporter.py @@ -30,7 +30,7 @@ class GooglePubSubItemExporter: def __init__(self, item_type_to_topic_mapping, message_attributes=(), - batch_max_bytes=1024 * 5, batch_max_latency=1, batch_max_messages=1000): + batch_max_bytes=1024 * 5, batch_max_latency=0.01, batch_max_messages=1000): self.item_type_to_topic_mapping = item_type_to_topic_mapping self.batch_max_bytes = batch_max_bytes @@ -45,16 +45,21 @@ def open(self): pass def export_items(self, items): - try: - self._export_items_with_timeout(items) - except timeout_decorator.TimeoutError as e: - # A bug in PubSub publisher that makes it stalled after running for some time. - # Exception in thread Thread-CommitBatchPublisher: - # details = "channel is in state TRANSIENT_FAILURE" - # https://stackoverflow.com/questions/55552606/how-can-one-catch-exceptions-in-python-pubsub-subscriber-that-are-happening-in-i?noredirect=1#comment97849067_55552606 - logging.info('Recreating Pub/Sub publisher.') - self.publisher = self.create_publisher() - raise e + tot_steps = (len(items) // 1000) + 1 + logging.info('Total publish loop steps'+str(tot_steps)) + for i in range(0, len(items), 1000): + mini_batch = items[i:i + 1000] + logging.info('Current Loop Iteration' + str(i + 1)+ 'out of'+str(tot_steps)) + try: + self._export_items_with_timeout(mini_batch) + except timeout_decorator.TimeoutError as e: + # A bug in PubSub publisher that makes it stalled after running for some time. + # Exception in thread Thread-CommitBatchPublisher: + # details = "channel is in state TRANSIENT_FAILURE" + # https://stackoverflow.com/questions/55552606/how-can-one-catch-exceptions-in-python-pubsub-subscriber-that-are-happening-in-i?noredirect=1#comment97849067_55552606 + logging.info('Recreating Pub/Sub publisher.') + self.publisher = self.create_publisher() + raise e @timeout_decorator.timeout(300) def _export_items_with_timeout(self, items): diff --git a/blockchainetl/jobs/exporters/kafka_exporter.py b/blockchainetl/jobs/exporters/kafka_exporter.py new file mode 100644 index 0000000..5ffdf70 --- /dev/null +++ b/blockchainetl/jobs/exporters/kafka_exporter.py @@ -0,0 +1,89 @@ +import collections +import json +import logging +import os +import socket +from confluent_kafka import Producer + +from blockchainetl.jobs.exporters.converters.composite_item_converter import CompositeItemConverter +from blockchainetl.jobs.exporters.bitcoin_flatten import flatten_transformation + + +class KafkaItemExporter: + + def __init__(self, output, item_type_to_topic_mapping, converters=()): + self.item_type_to_topic_mapping = item_type_to_topic_mapping + self.converter = CompositeItemConverter(converters) + # self.connection_url = self.get_connection_url(output) + # print(self.connection_url) + conf = { + "bootstrap.servers": os.getenv("CONFLUENT_BROKER"), + "security.protocol": "SASL_SSL", + "sasl.mechanisms": "PLAIN", + "client.id": socket.gethostname(), + "message.max.bytes": 5242880, + "sasl.username": os.getenv("KAFKA_PRODUCER_KEY"), + "sasl.password": os.getenv("KAFKA_PRODUCER_PASSWORD"), + "queue.buffering.max.messages": 10000000, + } + + self.producer = Producer(conf) + + def get_connection_url(self, output): + try: + return output.split('/')[1] + except KeyError: + raise Exception('Invalid kafka output param, It should be in format of "kafka/127.0.0.1:9092"') + + def open(self): + pass + + + def export_items(self, items): + for item in items: + item_type = item.get('type') + if item_type is not None and item_type in self.item_type_to_topic_mapping: + if(item_type == "transaction"): + transformed_data = flatten_transformation(item) + for data in transformed_data: + self.export_item(data,item_type) + else: + self.export_item(item,item_type) + else: + logging.warning('Topic for item type "{}" is not configured.'.format(item_type)) + + def export_item(self, item, item_type): + data = json.dumps(item).encode('utf-8') + message_future = self.write_to_kafka(value=data, + topic=self.item_type_to_topic_mapping[item_type]) + + return message_future + + + def write_to_kafka(self, value: str, topic: str): + # def acked(err, msg): + # if err is not None: + # self.logging.error('%% Message failed delivery: %s\n' % err) + try: + self.producer.produce(topic,key="0x0000",value=value) + self.producer.poll(0) + except BufferError: + logging.error('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % + len(self.producer)) + + + def convert_items(self, items): + for item in items: + yield self.converter.convert_item(item) + + def close(self): + self.producer.flush() + pass + + +def group_by_item_type(items): + result = collections.defaultdict(list) + for item in items: + result[item.get('type')].append(item) + + return result \ No newline at end of file diff --git a/blockchainetl/streaming/streamer.py b/blockchainetl/streaming/streamer.py index aa8f818..1d42308 100644 --- a/blockchainetl/streaming/streamer.py +++ b/blockchainetl/streaming/streamer.py @@ -24,6 +24,7 @@ import logging import os import time +from google.api_core.exceptions import InvalidArgument from blockchainetl.streaming.streamer_adapter_stub import StreamerAdapterStub from blockchainetl.file_utils import smart_open @@ -95,7 +96,12 @@ def _sync_cycle(self): current_block, target_block, self.last_synced_block, blocks_to_sync)) if blocks_to_sync != 0: - self.blockchain_streamer_adapter.export_all(self.last_synced_block + 1, target_block) + + try: + self.blockchain_streamer_adapter.export_all(self.last_synced_block + 1, target_block) + except InvalidArgument as e: + logging.exception(f"An exception occurred while syncing block data - InvalidArgument, ERROR = {e.message}") + logging.info('Writing last synced block {}'.format(target_block)) write_last_synced_block(self.last_synced_block_file, target_block) self.last_synced_block = target_block diff --git a/cloudbuild.yaml b/cloudbuild.yaml new file mode 100644 index 0000000..944636d --- /dev/null +++ b/cloudbuild.yaml @@ -0,0 +1,12 @@ +steps: + + - id: build-image + name: 'gcr.io/cloud-builders/docker' + args: ['build', '-f', 'Dockerfile_with_streaming', '.', '-t', 'gcr.io/$PROJECT_ID/bitcoin-etl:${TAG_NAME}'] + + - id: push-image + name: 'gcr.io/cloud-builders/docker' + args: ['push', 'gcr.io/$PROJECT_ID/bitcoin-etl:${TAG_NAME}'] + waitFor: ['build-image'] + +timeout: 600s \ No newline at end of file diff --git a/dockerhub.md b/dockerhub.md index 762cd29..cbb4180 100644 --- a/dockerhub.md +++ b/dockerhub.md @@ -1,10 +1,10 @@ # Uploading to Docker Hub ```bash -> BITCOINETL_STREAMING_VERSION=1.5.2-streaming -> docker build --platform linux/x86_64 -t bitcoin-etl:${BITCOINETL_STREAMING_VERSION} -f Dockerfile_with_streaming . -> docker tag bitcoin-etl:${BITCOINETL_STREAMING_VERSION} blockchainetl/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} -> docker push blockchainetl/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} +> BITCOINETL_STREAMING_VERSION=1.5.0-streaming +> docker build -t bitcoin-etl:${BITCOINETL_STREAMING_VERSION} -f Dockerfile_with_streaming . +> docker tag bitcoin-etl:${BITCOINETL_STREAMING_VERSION} merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} +> docker push merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} > docker tag bitcoin-etl:${BITCOINETL_STREAMING_VERSION} blockchainetl/bitcoin-etl:latest-streaming > docker push blockchainetl/bitcoin-etl:latest-streaming diff --git a/last_synced_block.txt b/last_synced_block.txt new file mode 100644 index 0000000..c6bfcb7 --- /dev/null +++ b/last_synced_block.txt @@ -0,0 +1 @@ +804002 diff --git a/litecoin_last_synced_block.txt b/litecoin_last_synced_block.txt new file mode 100644 index 0000000..8b0df76 --- /dev/null +++ b/litecoin_last_synced_block.txt @@ -0,0 +1 @@ +2535320 diff --git a/setup.py b/setup.py index a00868c..4600d22 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,8 @@ def read(fname): extras_require={ 'streaming': [ 'timeout-decorator==0.4.1', - 'google-cloud-pubsub==0.39.1' + 'google-cloud-pubsub==0.39.1', + 'confluent-kafka==2.2.0' ], 'dev': [ 'pytest~=4.3.0'