From f06ffaca12e1fbc285820df0094b3423f606df9e Mon Sep 17 00:00:00 2001 From: MKE Date: Thu, 14 Mar 2024 23:08:05 +0100 Subject: [PATCH] open interest & new archieve files are added --- demo.py | 7 +-- fast_binance/__init__.py | 9 +++- fast_binance/archieve_files.py | 11 +++-- fast_binance/offline_fetcher.py | 4 +- fast_binance/online_fetcher.py | 87 +++++++++++++++++++++++++-------- setup.py | 2 +- 6 files changed, 87 insertions(+), 33 deletions(-) diff --git a/demo.py b/demo.py index b582f93..83d0d6b 100644 --- a/demo.py +++ b/demo.py @@ -72,14 +72,15 @@ def analyze_premium_index_daily(): def download_agg_trades(): ofl = OfflineFileFetcher() - symbol = 'LOOMUSDT' + symbol = 'AVAXUSDT' - days = pd.date_range('2023-09-01', '2023-11-01', freq='1D') + days = pd.date_range('2023-11-01', '2023-11-04', freq='1D') - files = [AggregatedTradesFile(symbol, str(day.date()), 'daily', 'spot') + files = [AggregatedTradesFile(symbol, str(day.date()), 'daily', 'futures') for symbol, day in product([symbol], days)] res = ofl.download(files) + df = pd.concat(list(filter(lambda x: isinstance(x, pd.DataFrame), res))) df.to_csv(f'{symbol}_agg_trades.csv') diff --git a/fast_binance/__init__.py b/fast_binance/__init__.py index eeeff9c..4c50358 100644 --- a/fast_binance/__init__.py +++ b/fast_binance/__init__.py @@ -1,7 +1,12 @@ -__version__ = "1.1.0" +__version__ = "1.1.1" + +from fast_binance.online_fetcher import ( + OnlinePriceFetcher, + OpenInterestFetcher +) -from fast_binance.online_fetcher import OnlinePriceFetcher from fast_binance.offline_fetcher import OfflineFileFetcher + from fast_binance.archieve_files import( PriceKlinesFile, FuturesUsdtMetricsFile, diff --git a/fast_binance/archieve_files.py b/fast_binance/archieve_files.py index 49a60d1..bc51ad3 100644 --- a/fast_binance/archieve_files.py +++ b/fast_binance/archieve_files.py @@ -129,9 +129,8 @@ def __init__(self, symbol, date, span, market): self.span = span self.market = 'spot' if market == 'spot' else 'futures/um' self.name = f'{self.symbol}-aggTrades-{self.date}' # ACAUSDT-aggTrades-2024-02-07 - self.columns = ['aggTradeID', 'price', 'quantity', 'ftId', 'ltId', 'time', 'is_buyer_maker', 'ig'] \ - if market == 'spot' \ - else ['aggTradeID', 'price', 'quantity', 'ftId', 'ltId', 'time', 'is_buyer_maker', 'ig'] + self.columns = ['aggTradeID', 'price', 'quantity', 'ftId', 'ltId', 'time', 'is_buyer_maker', 'ig'] if market == 'spot' \ + else ['aggTradeID', 'price', 'quantity', 'ftId', 'ltId', 'time', 'is_buyer_maker'] @property def source(self): @@ -146,9 +145,13 @@ def local(self): pass def prepare_df(self, df): - df.columns = ['aggTradeID', 'price', 'quantity', 'ftId', 'ltId', 'time', 'is_buyer_maker', 'ig'] + print('IM HERE 1', self.columns) + df.columns = self.columns df = df.astype({"time":int}) + print('IM HERE 1b') df['time'] = pd.to_datetime(df['time'], unit='ms') + print('IM HERE 2') df = df.drop(columns=['ftId', 'ltId', 'ig']) df = df.set_index(df['time']).sort_index() + print('IM HERE 3') return df diff --git a/fast_binance/offline_fetcher.py b/fast_binance/offline_fetcher.py index 7e5b1d9..131e2b0 100644 --- a/fast_binance/offline_fetcher.py +++ b/fast_binance/offline_fetcher.py @@ -34,13 +34,13 @@ async def _fetch_file(self, session, file): downloads zip file and extract .csv file and add column information returns pandas dataframe ''' - + print(file.source) async with session.get(file.source) as resp: assert resp.status == 200 data = await resp.read() with zipfile.ZipFile(io.BytesIO(data)) as archive: fname = archive.namelist()[0] - data = pd.read_csv(archive.open(fname), dtype=object) + data = pd.read_csv(archive.open(fname), dtype=object, header=None) data = file.prepare_df(data) return data diff --git a/fast_binance/online_fetcher.py b/fast_binance/online_fetcher.py index e0cb943..11874c5 100644 --- a/fast_binance/online_fetcher.py +++ b/fast_binance/online_fetcher.py @@ -12,6 +12,7 @@ 'close_time', 'quote_volume', 'count', 'taker_buy_volume', 'taker_buy_quote_volume', 'ignore'] +OI_STATS_COLUMNS = ['symbol', 'sumOpenInterest', 'sumOpenInterestValue', 'timestamp'] def convert_price_data_types(raw_prices:pd.DataFrame, convert_types=True): @@ -39,6 +40,20 @@ def convert_price_data_types(raw_prices:pd.DataFrame, convert_types=True): raw_prices = raw_prices.set_index('open_time') return raw_prices + +def convert_oi_data(oi_data:pd.DataFrame, convert_types=True): + oi_data.columns = OI_STATS_COLUMNS + if convert_types: + oi_data = oi_data.astype({ + 'timestamp': 'int64', + 'sumOpenInterest': 'float64', + 'sumOpenInterestValue':'float64', + }) + oi_data['timestamp'] = pd.to_datetime(oi_data['timestamp'], unit='ms') + oi_data = oi_data.set_index('timestamp') + return oi_data + + class HistoricalFundingRate: def __init__(self): self._client = Client() @@ -73,12 +88,10 @@ def save(self): def load(self): pass -class OnlinePriceFetcher: - def __init__(self, market, worket_count = DEFAULT_WORKER_COUNT): +class MultiplexFetcher: + def __init__(self, worket_count = DEFAULT_WORKER_COUNT): self._worker = worket_count # Parallel async request count self._async_client = None - self._market = market - async def _fetch_symbols(self, symbols, **kwargs): self._async_client = await AsyncClient.create() @@ -91,10 +104,9 @@ async def _fetch_symbols(self, symbols, **kwargs): finally: await self._async_client.close_connection() - prices = { result[0]: result[1] for result in results } - - return prices - + result_dict = { result[0]: result[1] for result in results } + return result_dict + async def _fetch_chunk(self, symbols, **kwargs): """ creates a Future object for every symbol in the symbol_chunk. @@ -108,6 +120,33 @@ async def _fetch_chunk(self, symbols, **kwargs): return await asyncio.gather(*tasks, return_exceptions=True) async def _fetch_symbol(self, symbol, **kwargs): + raise NotImplementedError + + async def _get_function(self, symbol, **kwargs): + raise NotImplementedError + + async def async_download(self, symbols, **kwargs): + return await self._fetch_symbols(symbols, **kwargs) + + def download(self, symbols, **kwargs): + try: + loop = asyncio.get_event_loop() + except RuntimeError as e: + if str(e).startswith('There is no current event loop in thread'): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + else: + raise + res = loop.run_until_complete(self._fetch_symbols(symbols, **kwargs)) + return res + + +class OnlinePriceFetcher(MultiplexFetcher): + def __init__(self, market, worket_count = DEFAULT_WORKER_COUNT): + super().__init__(worket_count) + self._market = market + + async def _fetch_symbol(self, symbol, **kwargs): """ bottom-level function that sends HTTP request to Binance and awaits on response """ @@ -121,7 +160,7 @@ async def _fetch_symbol(self, symbol, **kwargs): return symbol, str(e) return symbol, price_df - + async def _get_function(self, symbol, **kwargs): # TODO symbol unavailable hatasi geldiginde onu sonuca eklememek lazim if self._market == 'spot': @@ -131,17 +170,23 @@ async def _get_function(self, symbol, **kwargs): else: raise ValueError(f"market type is invalid : {self._market}") - async def async_download(self, symbols, **kwargs): - return await self._fetch_symbols(symbols, **kwargs) +class OpenInterestFetcher(MultiplexFetcher): + async def _get_function(self, **kwargs): + # TODO symbol unavailable hatasi geldiginde onu sonuca eklememek lazim + return await self._async_client.futures_open_interest_hist(**kwargs) - def download(self, symbols, **kwargs): + async def _fetch_symbol(self, symbol, **kwargs): + """ + bottom-level function that sends HTTP request to Binance and awaits on response + """ try: - loop = asyncio.get_event_loop() - except RuntimeError as e: - if str(e).startswith('There is no current event loop in thread'): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - else: - raise - res = loop.run_until_complete(self._fetch_symbols(symbols, **kwargs)) - return res + kwargs['symbol'] = symbol + oi_raw = await self._get_function(**kwargs) + oi_df = pd.DataFrame(oi_raw) + oi_df = convert_oi_data(oi_df) + except Exception as e: + print(f'An error occured while fetching {symbol}. Error: {e}. {kwargs}') + # raise Exception(f'An error occured while fetching {symbol}. Error: {e}') + return symbol, str(e) + return symbol, oi_df + \ No newline at end of file diff --git a/setup.py b/setup.py index 3b40c5f..5b5d752 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,7 @@ name='fast-binance', version=version, packages=['fast_binance'], - description='Asyncful wrapper around python-binance module.', + description='Asyncful wrapper around python-binance module and data.binance.vision .', long_description=long_description, long_description_content_type="text/x-rst", url='https://github.com/mkaanerkoc/fast-binance',