Skip to content

Commit

Permalink
open interest & new archieve files are added
Browse files Browse the repository at this point in the history
  • Loading branch information
mkaanerkoc committed Mar 14, 2024
1 parent ada7d85 commit f06ffac
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 33 deletions.
7 changes: 4 additions & 3 deletions demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ def analyze_premium_index_daily():

def download_agg_trades():
ofl = OfflineFileFetcher()
symbol = 'LOOMUSDT'
symbol = 'AVAXUSDT'

days = pd.date_range('2023-09-01', '2023-11-01', freq='1D')
days = pd.date_range('2023-11-01', '2023-11-04', freq='1D')

files = [AggregatedTradesFile(symbol, str(day.date()), 'daily', 'spot')
files = [AggregatedTradesFile(symbol, str(day.date()), 'daily', 'futures')
for symbol, day in product([symbol], days)]

res = ofl.download(files)

df = pd.concat(list(filter(lambda x: isinstance(x, pd.DataFrame), res)))
df.to_csv(f'{symbol}_agg_trades.csv')

Expand Down
9 changes: 7 additions & 2 deletions fast_binance/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
__version__ = "1.1.0"
__version__ = "1.1.1"

from fast_binance.online_fetcher import (
OnlinePriceFetcher,
OpenInterestFetcher
)

from fast_binance.online_fetcher import OnlinePriceFetcher
from fast_binance.offline_fetcher import OfflineFileFetcher

from fast_binance.archieve_files import(
PriceKlinesFile,
FuturesUsdtMetricsFile,
Expand Down
11 changes: 7 additions & 4 deletions fast_binance/archieve_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,8 @@ def __init__(self, symbol, date, span, market):
self.span = span
self.market = 'spot' if market == 'spot' else 'futures/um'
self.name = f'{self.symbol}-aggTrades-{self.date}' # ACAUSDT-aggTrades-2024-02-07
self.columns = ['aggTradeID', 'price', 'quantity', 'ftId', 'ltId', 'time', 'is_buyer_maker', 'ig'] \
if market == 'spot' \
else ['aggTradeID', 'price', 'quantity', 'ftId', 'ltId', 'time', 'is_buyer_maker', 'ig']
self.columns = ['aggTradeID', 'price', 'quantity', 'ftId', 'ltId', 'time', 'is_buyer_maker', 'ig'] if market == 'spot' \
else ['aggTradeID', 'price', 'quantity', 'ftId', 'ltId', 'time', 'is_buyer_maker']

@property
def source(self):
Expand All @@ -146,9 +145,13 @@ def local(self):
pass

def prepare_df(self, df):
df.columns = ['aggTradeID', 'price', 'quantity', 'ftId', 'ltId', 'time', 'is_buyer_maker', 'ig']
print('IM HERE 1', self.columns)
df.columns = self.columns
df = df.astype({"time":int})
print('IM HERE 1b')
df['time'] = pd.to_datetime(df['time'], unit='ms')
print('IM HERE 2')
df = df.drop(columns=['ftId', 'ltId', 'ig'])
df = df.set_index(df['time']).sort_index()
print('IM HERE 3')
return df
4 changes: 2 additions & 2 deletions fast_binance/offline_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ async def _fetch_file(self, session, file):
downloads zip file and extract .csv file and add column information
returns pandas dataframe
'''

print(file.source)
async with session.get(file.source) as resp:
assert resp.status == 200
data = await resp.read()
with zipfile.ZipFile(io.BytesIO(data)) as archive:
fname = archive.namelist()[0]
data = pd.read_csv(archive.open(fname), dtype=object)
data = pd.read_csv(archive.open(fname), dtype=object, header=None)
data = file.prepare_df(data)
return data

Expand Down
87 changes: 66 additions & 21 deletions fast_binance/online_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
'close_time', 'quote_volume', 'count',
'taker_buy_volume', 'taker_buy_quote_volume', 'ignore']

OI_STATS_COLUMNS = ['symbol', 'sumOpenInterest', 'sumOpenInterestValue', 'timestamp']


def convert_price_data_types(raw_prices:pd.DataFrame, convert_types=True):
Expand Down Expand Up @@ -39,6 +40,20 @@ def convert_price_data_types(raw_prices:pd.DataFrame, convert_types=True):
raw_prices = raw_prices.set_index('open_time')
return raw_prices


def convert_oi_data(oi_data:pd.DataFrame, convert_types=True):
oi_data.columns = OI_STATS_COLUMNS
if convert_types:
oi_data = oi_data.astype({
'timestamp': 'int64',
'sumOpenInterest': 'float64',
'sumOpenInterestValue':'float64',
})
oi_data['timestamp'] = pd.to_datetime(oi_data['timestamp'], unit='ms')
oi_data = oi_data.set_index('timestamp')
return oi_data


class HistoricalFundingRate:
def __init__(self):
self._client = Client()
Expand Down Expand Up @@ -73,12 +88,10 @@ def save(self):
def load(self):
pass

class OnlinePriceFetcher:
def __init__(self, market, worket_count = DEFAULT_WORKER_COUNT):
class MultiplexFetcher:
def __init__(self, worket_count = DEFAULT_WORKER_COUNT):
self._worker = worket_count # Parallel async request count
self._async_client = None
self._market = market


async def _fetch_symbols(self, symbols, **kwargs):
self._async_client = await AsyncClient.create()
Expand All @@ -91,10 +104,9 @@ async def _fetch_symbols(self, symbols, **kwargs):
finally:
await self._async_client.close_connection()

prices = { result[0]: result[1] for result in results }

return prices

result_dict = { result[0]: result[1] for result in results }
return result_dict

async def _fetch_chunk(self, symbols, **kwargs):
"""
creates a Future object for every symbol in the symbol_chunk.
Expand All @@ -108,6 +120,33 @@ async def _fetch_chunk(self, symbols, **kwargs):
return await asyncio.gather(*tasks, return_exceptions=True)

async def _fetch_symbol(self, symbol, **kwargs):
raise NotImplementedError

async def _get_function(self, symbol, **kwargs):
raise NotImplementedError

async def async_download(self, symbols, **kwargs):
return await self._fetch_symbols(symbols, **kwargs)

def download(self, symbols, **kwargs):
try:
loop = asyncio.get_event_loop()
except RuntimeError as e:
if str(e).startswith('There is no current event loop in thread'):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
else:
raise
res = loop.run_until_complete(self._fetch_symbols(symbols, **kwargs))
return res


class OnlinePriceFetcher(MultiplexFetcher):
def __init__(self, market, worket_count = DEFAULT_WORKER_COUNT):
super().__init__(worket_count)
self._market = market

async def _fetch_symbol(self, symbol, **kwargs):
"""
bottom-level function that sends HTTP request to Binance and awaits on response
"""
Expand All @@ -121,7 +160,7 @@ async def _fetch_symbol(self, symbol, **kwargs):
return symbol, str(e)
return symbol, price_df


async def _get_function(self, symbol, **kwargs):
# TODO symbol unavailable hatasi geldiginde onu sonuca eklememek lazim
if self._market == 'spot':
Expand All @@ -131,17 +170,23 @@ async def _get_function(self, symbol, **kwargs):
else:
raise ValueError(f"market type is invalid : {self._market}")

async def async_download(self, symbols, **kwargs):
return await self._fetch_symbols(symbols, **kwargs)
class OpenInterestFetcher(MultiplexFetcher):
async def _get_function(self, **kwargs):
# TODO symbol unavailable hatasi geldiginde onu sonuca eklememek lazim
return await self._async_client.futures_open_interest_hist(**kwargs)

def download(self, symbols, **kwargs):
async def _fetch_symbol(self, symbol, **kwargs):
"""
bottom-level function that sends HTTP request to Binance and awaits on response
"""
try:
loop = asyncio.get_event_loop()
except RuntimeError as e:
if str(e).startswith('There is no current event loop in thread'):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
else:
raise
res = loop.run_until_complete(self._fetch_symbols(symbols, **kwargs))
return res
kwargs['symbol'] = symbol
oi_raw = await self._get_function(**kwargs)
oi_df = pd.DataFrame(oi_raw)
oi_df = convert_oi_data(oi_df)
except Exception as e:
print(f'An error occured while fetching {symbol}. Error: {e}. {kwargs}')
# raise Exception(f'An error occured while fetching {symbol}. Error: {e}')
return symbol, str(e)
return symbol, oi_df

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
name='fast-binance',
version=version,
packages=['fast_binance'],
description='Asyncful wrapper around python-binance module.',
description='Asyncful wrapper around python-binance module and data.binance.vision .',
long_description=long_description,
long_description_content_type="text/x-rst",
url='https://github.com/mkaanerkoc/fast-binance',
Expand Down

0 comments on commit f06ffac

Please sign in to comment.