Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added kafka exporter #72

Open
wants to merge 73 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
f009d4c
fix
Oct 29, 2019
269926f
remove ide files
Oct 30, 2019
8bafe10
add new fields to blocks and transactions export_blocks job
Oct 30, 2019
6ffd226
update txn mapper, adds transaction_id
Oct 30, 2019
24c6693
syncs legacy export schema with current schema
Nov 4, 2019
ecd1bfa
Merge pull request #1 from merklescience/dev-add-coin
aknirmal90 Nov 4, 2019
2c6367f
removing unused fields
Nov 11, 2019
8fb7260
Merge pull request #2 from merklescience/feature/fix-export-schema
aknirmal90 Nov 11, 2019
03dbcad
fix for tranasction ids
Nov 12, 2019
f4b0802
handle case of lower verbosity
Nov 12, 2019
7ef0fdb
Merge pull request #3 from merklescience/feature/fix-export-schema
aknirmal90 Nov 12, 2019
899bea6
try caching cryptocompare prices
Nov 12, 2019
09f661e
reading from cache for daily prices
Nov 12, 2019
0ef5f9f
Merge pull request #4 from merklescience/feature/fix-export-schema
aknirmal90 Nov 12, 2019
e009deb
increase tiemout - bitcoincash failures
Nov 13, 2019
a718ec3
Merge pull request #5 from merklescience/feature/fix-export-schema
aknirmal90 Nov 13, 2019
63aa436
stream
Mar 9, 2020
a62b8a7
countainer command
Mar 9, 2020
ca85867
fix streaming
saurabhdaga-merkle Feb 25, 2021
e12dbbb
add bsv
saurabhdaga-merkle Mar 8, 2021
81943de
add bsv
saurabhdaga-merkle Mar 8, 2021
9c175cf
add bsv
saurabhdaga-merkle Mar 8, 2021
fd10f2d
debug api key
saurabhdaga-merkle Mar 22, 2021
3e9d8f6
add BSV
saurabhdaga-merkle Mar 22, 2021
391b3d4
BSV changes
saurabhdaga-merkle May 3, 2021
79b0eee
Merge branch 'blockchain-etl:master' into master
saurabhdaga-merkle Jun 14, 2021
a0508e7
fixes
saurabhdaga-merkle Jul 6, 2021
8952da7
merged master
saurabhdaga-merkle Jul 6, 2021
6b8a8cf
fix decimal places
saurabhdaga-merkle Jul 7, 2021
c77ba34
Fix for bsv (#9)
saurabhdaga-merkle Aug 11, 2021
63cce73
Fix for bsv (#10)
saurabhdaga-merkle Aug 11, 2021
b109ee2
updated readme
saurabhdaga-merkle Aug 26, 2021
21537da
updated values
saurabhdaga-merkle Aug 26, 2021
a9be6f2
taproot testing from upstream
Dec 16, 2021
e78acb9
Merge pull request #13 from merklescience/streaming-taproot
prassee Dec 16, 2021
2940637
chunking publish to pubsub (#11)
akshay-ghy Dec 16, 2021
56d4ad0
Merge branch 'blockchain-etl:master' into master
saurabhdaga-merkle Dec 16, 2021
bcc931b
hot fix for taproot
Dec 16, 2021
6c63de3
Merge pull request #14 from merklescience/streaming-taproot
prassee Dec 16, 2021
cc746b8
Merge branch 'blockchain-etl:master' into master
saurabhdaga-merkle Jun 26, 2022
a37aefa
Remove coin_price_usd (#7)
akshay-ghy Jun 26, 2022
a6d645e
Remove coin price (#18)
saurabhdaga-merkle Jun 26, 2022
bf8bf26
Remove coin price (#19)
saurabhdaga-merkle Jun 26, 2022
831a083
merge branch streaming
saurabhdaga-merkle Jun 26, 2022
e2fd993
Adds cloudbuild config
Aug 30, 2022
37fa609
Adds correct dockerfile
Aug 30, 2022
08057c3
Merge pull request #20 from merklescience/feature/adds-continous-inte…
aknirmal90 Aug 30, 2022
3dbbf74
Fix filename
Aug 30, 2022
69fd838
Merge pull request #21 from merklescience/master
aknirmal90 Aug 30, 2022
8c8d363
Merge pull request #22 from merklescience/feature/adds-continous-inte…
aknirmal90 Aug 30, 2022
68b68f0
Merge pull request #23 from merklescience/develop
aknirmal90 Aug 30, 2022
1cc2312
Fix typo
Aug 30, 2022
fc62d39
Merge pull request #24 from merklescience/feature/adds-continous-inte…
aknirmal90 Aug 30, 2022
0259a26
Merge branch 'master' into develop
aknirmal90 Aug 30, 2022
81283f3
Merge pull request #25 from merklescience/develop
aknirmal90 Aug 30, 2022
68880a3
resolve conflicts with master
akshay-ghy Sep 2, 2022
d2ea670
Merge pull request #6 from merklescience/streaming
aknirmal90 Sep 2, 2022
42dd40d
added kafka exporter
Aug 17, 2023
5255388
added test file
Aug 17, 2023
0dbabec
test file created for bitcoin flatten logic
Aug 18, 2023
1581d95
updated the code
Aug 18, 2023
3b35718
added bitcoin flatten transformation
Aug 18, 2023
2b33c58
confluent conf added
Aug 19, 2023
4260518
new changes
Aug 19, 2023
7b128dc
added key in kafka producer
Aug 20, 2023
8c37eca
new changes
Aug 21, 2023
de69224
new changes
Aug 21, 2023
f92e88a
new changes
Aug 21, 2023
4c521e5
added topic mapping params in cli
Aug 22, 2023
f7b9cba
new changes
Aug 29, 2023
65fca90
Merge branch 'master' into latest
aknirmal90 Sep 5, 2023
69ba9f2
updated the flatten logic
Sep 7, 2023
9512608
merge with latest branchMerge branch 'latest' of github.com:merklesci…
Sep 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,8 @@ coverage.xml
.venv
venv/
ENV/

.idea/

*.json
*.txt
8 changes: 7 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
FROM python:3.6-alpine

MAINTAINER Omidiora Samuel <[email protected]>

ENV PROJECT_DIR=bitcoin-etl

RUN mkdir /$PROJECT_DIR

WORKDIR /$PROJECT_DIR

COPY . .

RUN apk add --no-cache gcc musl-dev #for C libraries: <limits.h> <stdio.h>
RUN pip install --upgrade pip && pip install -e /$PROJECT_DIR/

RUN pip install --upgrade pip && pip install -e /$PROJECT_DIR/ && pip install dist/bitcoin-etl-1.5.2.tar.gz && pip install kafka-python==2.0.2

ENTRYPOINT ["python", "bitcoinetl"]
9 changes: 8 additions & 1 deletion Dockerfile_with_streaming
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
FROM python:3.6

MAINTAINER Evgeny Medvedev <[email protected]>

ENV PROJECT_DIR=bitcoin-etl

RUN mkdir /$PROJECT_DIR

WORKDIR /$PROJECT_DIR

COPY . .
RUN pip install --upgrade pip && pip install -e /$PROJECT_DIR/[streaming]

RUN pip install --upgrade pip && pip install dist/bitcoin-etl-1.5.2.tar.gz && pip install -e /$PROJECT_DIR/[streaming]

# Add Tini
ENV TINI_VERSION v0.18.0

ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /tini

RUN chmod +x /tini

ENTRYPOINT ["/tini", "--", "python", "bitcoinetl"]
5 changes: 5 additions & 0 deletions README_CONTAINER_REGISTRY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BITCOINETL_STREAMING_VERSION=1.4-streaming
docker build -t merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} -f Dockerfile_with_streaming .
docker tag merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION} us.gcr.io/staging-btc-etl/merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION}
docker push us.gcr.io/staging-btc-etl/merklescience/bitcoin-etl:${BITCOINETL_STREAMING_VERSION}

4 changes: 2 additions & 2 deletions bitcoinetl/cli/export_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import re

from datetime import datetime, timedelta
from bitcoinetl.enumeration.chain import Chain
from bitcoinetl.enumeration.chain import Chain, CoinPriceType
from bitcoinetl.jobs.export_all import export_all as do_export_all
from bitcoinetl.service.btc_block_range_service import BtcBlockRangeService
from bitcoinetl.rpc.bitcoin_rpc import BitcoinRpc
Expand Down Expand Up @@ -99,4 +99,4 @@ def get_partitions(start, end, partition_batch_size, provider_uri):
def export_all(start, end, partition_batch_size, provider_uri, output_dir, max_workers, export_batch_size, chain, enrich):
"""Exports all data for a range of blocks."""
do_export_all(chain, get_partitions(start, end, partition_batch_size, provider_uri),
output_dir, provider_uri, max_workers, export_batch_size, enrich)
output_dir, provider_uri, max_workers, export_batch_size, enrich)
7 changes: 4 additions & 3 deletions bitcoinetl/cli/export_blocks_and_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import click

from bitcoinetl.enumeration.chain import Chain
from bitcoinetl.enumeration.chain import Chain, CoinPriceType
from bitcoinetl.jobs.export_blocks_job import ExportBlocksJob
from bitcoinetl.jobs.exporters.blocks_and_transactions_item_exporter import blocks_and_transactions_item_exporter
from bitcoinetl.rpc.bitcoin_rpc import BitcoinRpc
Expand All @@ -49,7 +49,7 @@
@click.option('-c', '--chain', default=Chain.BITCOIN, type=click.Choice(Chain.ALL),
help='The type of chain')
def export_blocks_and_transactions(start_block, end_block, batch_size, provider_uri,
max_workers, blocks_output, transactions_output, chain):
max_workers, blocks_output, transactions_output, chain,):
"""Export blocks and transactions."""
if blocks_output is None and transactions_output is None:
raise ValueError('Either --blocks-output or --transactions-output options must be provided')
Expand All @@ -63,5 +63,6 @@ def export_blocks_and_transactions(start_block, end_block, batch_size, provider_
item_exporter=blocks_and_transactions_item_exporter(blocks_output, transactions_output),
chain=chain,
export_blocks=blocks_output is not None,
export_transactions=transactions_output is not None)
export_transactions=transactions_output is not None,
)
job.run()
17 changes: 11 additions & 6 deletions bitcoinetl/cli/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
# SOFTWARE.

import click

import json
from bitcoinetl.enumeration.chain import Chain
from bitcoinetl.rpc.bitcoin_rpc import BitcoinRpc

Expand All @@ -41,16 +41,17 @@
@click.option('-o', '--output', type=str,
help='Google PubSub topic path e.g. projects/your-project/topics/bitcoin_blockchain. '
'If not specified will print to console.')
@click.option('--topic-mapping', default=None, type=str, help="Topic Mapping should be json like {\"block\": \"producer-litcoin-blocks-hot\",\"transaction\": \"producer-litcoin-transactions-hot\"}")
@click.option('-s', '--start-block', default=None, type=int, help='Start block.')
@click.option('-c', '--chain', default=Chain.BITCOIN, type=click.Choice(Chain.ALL), help='The type of chain.')
@click.option('--period-seconds', default=10, type=int, help='How many seconds to sleep between syncs.')
@click.option('-b', '--batch-size', default=2, type=int, help='How many blocks to batch in single request.')
@click.option('--period-seconds', default=1, type=int, help='How many seconds to sleep between syncs.')
@click.option('-b', '--batch-size', default=1, type=int, help='How many blocks to batch in single request.')
@click.option('-B', '--block-batch-size', default=10, type=int, help='How many blocks to batch in single sync round.')
@click.option('-w', '--max-workers', default=5, type=int, help='The number of workers.')
@click.option('--log-file', default=None, type=str, help='Log file.')
@click.option('--pid-file', default=None, type=str, help='pid file.')
@click.option('--enrich', default=True, type=bool, help='Enable filling in transactions inputs fields.')
def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain=Chain.BITCOIN,
def stream(last_synced_block_file, lag, provider_uri, output, topic_mapping, start_block, chain=Chain.BITCOIN,
period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None,
enrich=True):
"""Streams all data types to console or Google Pub/Sub."""
Expand All @@ -61,13 +62,16 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain
from bitcoinetl.streaming.btc_streamer_adapter import BtcStreamerAdapter
from blockchainetl.streaming.streamer import Streamer

if (topic_mapping is not None):
topic_mapping = json.loads(topic_mapping)

streamer_adapter = BtcStreamerAdapter(
bitcoin_rpc=ThreadLocalProxy(lambda: BitcoinRpc(provider_uri)),
item_exporter=get_item_exporter(output),
item_exporter=get_item_exporter(output,topic_mapping,chain),
chain=chain,
batch_size=batch_size,
enable_enrich=enrich,
max_workers=max_workers
max_workers=max_workers,
)
streamer = Streamer(
blockchain_streamer_adapter=streamer_adapter,
Expand All @@ -77,5 +81,6 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, chain
period_seconds=period_seconds,
block_batch_size=block_batch_size,
pid_file=pid_file,
retry_errors=retry_errors
)
streamer.stream()
18 changes: 18 additions & 0 deletions bitcoinetl/domain/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

class BtcBlock(object):


def __init__(self):
self.hash = None
self.size = None
Expand All @@ -36,8 +37,25 @@ def __init__(self):
self.nonce = None
self.bits = None
self.coinbase_param = None
self.transaction_count = None

self.transactions = []

# New fields added
self.transaction_ids = []

self.version_hex = None
self.median_timestamp = None
self.difficulty = None
self.chain_work = None
self.previous_block_hash = None
self.next_block_hash = None
self.input_value = None

self.block_reward = None
self.transaction_fees = None
self.coinbase_txid = None
self.coinbase_param_decoded = None

def has_full_transactions(self):
return len(self.transactions) > 0 and isinstance(self.transactions[0], BtcTransaction)
8 changes: 8 additions & 0 deletions bitcoinetl/domain/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ def __init__(self):
self.join_splits = []
self.value_balance = 0

# New fields
self.transaction_id = None
self.weight = None
self.input_count = None
self.input_value = None
self.output_count = None
self.output_value = None

def add_input(self, input):
if len(self.inputs) > 0:
input.index = self.inputs[len(self.inputs) - 1].index + 1
Expand Down
9 changes: 6 additions & 3 deletions bitcoinetl/domain/transaction_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@

class BtcTransactionInput(object):
def __init__(self):
self.create_transaction_id = None
self.create_output_index = None

self.spending_transaction_id = None
self.index = None
self.spent_transaction_hash = None
self.spent_output_index = None

self.script_asm = None
self.script_hex = None
self.coinbase_param = None
Expand All @@ -37,4 +40,4 @@ def __init__(self):
self.value = None

def is_coinbase(self):
return self.coinbase_param is not None or self.spent_transaction_hash is None
return self.coinbase_param is not None or self.create_transaction_id is None
4 changes: 4 additions & 0 deletions bitcoinetl/domain/transaction_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,7 @@ def __init__(self):

self.addresses = []
self.value = None
self.witness = []

self.create_transaction_id = None
self.spending_transaction_id = None
24 changes: 23 additions & 1 deletion bitcoinetl/enumeration/chain.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
class Chain:
BITCOIN = 'bitcoin'
BITCOIN_CASH = 'bitcoin_cash'
BITCOIN_CASH_SV = 'bitcoin_cash_sv'
BITCOIN_GOLD = 'bitcoin_gold'
DOGECOIN = 'dogecoin'
LITECOIN = 'litecoin'
DASH = 'dash'
ZCASH = 'zcash'
MONACOIN = 'monacoin'

ALL = [BITCOIN, BITCOIN_CASH, BITCOIN_GOLD, DOGECOIN, LITECOIN, DASH, ZCASH, MONACOIN]
ALL = [BITCOIN, BITCOIN_CASH, BITCOIN_CASH_SV, BITCOIN_GOLD, DOGECOIN, LITECOIN, DASH, ZCASH, MONACOIN]
# Old API doesn't support verbosity for getblock which doesn't allow querying all transactions in a block in 1 go.
HAVE_OLD_API = [BITCOIN_CASH, DOGECOIN, DASH, MONACOIN]

@classmethod
def ticker_symbol(cls, chain):
symbols = {
'bitcoin': 'BTC',
'bitcoin_cash': 'BCH',
'bitcoin_cash_sv': 'BSV',
'dogecoin': 'DOGE',
'litecoin': 'LTC',
'dash': 'DASH',
'zcash': 'ZEC',
'monacoin': 'MONA',
}
return symbols.get(chain, None)


class CoinPriceType:

empty = 0
daily = 1
hourly = 2
22 changes: 11 additions & 11 deletions bitcoinetl/jobs/enrich_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _enrich_transactions(self, transactions):
input_transactions_map = self._get_input_transactions_as_map(transaction_input_batch)
for input in transaction_input_batch:
output = self._get_output_for_input(input, input_transactions_map) \
if input.spent_transaction_hash is not None else None
if input.create_transaction_id is not None else None
if output is not None:
input.required_signatures = output.required_signatures
input.type = output.type
Expand All @@ -74,29 +74,29 @@ def _enrich_transactions(self, transactions):
self.item_exporter.export_item(self.transaction_mapper.transaction_to_dict(transaction))

def _get_input_transactions_as_map(self, transaction_inputs):
transaction_hashes = [input.spent_transaction_hash for input in transaction_inputs
if input.spent_transaction_hash is not None]
transaction_hashes = [input.create_transaction_id for input in transaction_inputs
if input.create_transaction_id is not None]

transaction_hashes = set(transaction_hashes)
if len(transaction_hashes) > 0:
transactions = self.btc_service.get_transactions_by_hashes(transaction_hashes)
return {transaction.hash: transaction for transaction in transactions}
return {transaction.transaction_id: transaction for transaction in transactions}
else:
return {}

def _get_output_for_input(self, transaction_input, input_transactions_map):
spent_transaction_hash = transaction_input.spent_transaction_hash
input_transaction = input_transactions_map.get(spent_transaction_hash)
create_transaction_id = transaction_input.create_transaction_id
input_transaction = input_transactions_map.get(create_transaction_id)
if input_transaction is None:
raise ValueError('Input transaction with hash {} not found'.format(spent_transaction_hash))
raise ValueError('Input transaction with hash {} not found'.format(create_transaction_id))

spent_output_index = transaction_input.spent_output_index
if input_transaction.outputs is None or len(input_transaction.outputs) < (spent_output_index + 1):
create_output_index = transaction_input.create_output_index
if input_transaction.outputs is None or len(input_transaction.outputs) < (create_output_index + 1):
raise ValueError(
'There is no output with index {} in transaction with hash {}'.format(
spent_output_index, spent_transaction_hash))
create_output_index, create_transaction_id))

output = input_transaction.outputs[spent_output_index]
output = input_transaction.outputs[create_output_index]
return output

def _end(self):
Expand Down
7 changes: 5 additions & 2 deletions bitcoinetl/jobs/export_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
logger = logging.getLogger('export_all')


def export_all(chain, partitions, output_dir, provider_uri, max_workers, batch_size, enrich):
def export_all(
chain, partitions, output_dir, provider_uri, max_workers, batch_size, enrich,
):
for batch_start_block, batch_end_block, partition_dir, *args in partitions:
# # # start # # #

Expand Down Expand Up @@ -101,7 +103,8 @@ def export_all(chain, partitions, output_dir, provider_uri, max_workers, batch_s
max_workers=max_workers,
item_exporter=blocks_and_transactions_item_exporter(blocks_file, transactions_file),
export_blocks=blocks_file is not None,
export_transactions=transactions_file is not None)
export_transactions=transactions_file is not None,
)
job.run()

if enrich == True:
Expand Down
2 changes: 2 additions & 0 deletions bitcoinetl/jobs/export_blocks_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from blockchainetl.executors.batch_work_executor import BatchWorkExecutor
from blockchainetl.jobs.base_job import BaseJob
from blockchainetl.utils import validate_range
from bitcoinetl.enumeration.chain import CoinPriceType


# Exports blocks and transactions
Expand All @@ -43,6 +44,7 @@ def __init__(
export_blocks=True,
export_transactions=True):
validate_range(start_block, end_block)

self.start_block = start_block
self.end_block = end_block

Expand Down
Loading