From 30f39d09aaf5ac4a05e0a024731e6a7df7c1f1d1 Mon Sep 17 00:00:00 2001 From: Adir Haleli Date: Tue, 6 Oct 2020 16:47:21 +0300 Subject: [PATCH] Version 1.0 --- .gitignore | 2 + Dockerfile | 9 + README.md | 54 +++ fast-reindex.py | 452 ------------------------ fast_elasticsearch_reindex/__main__.py | 196 ++++++++++ fast_elasticsearch_reindex/reindexer.py | 213 +++++++++++ setup.py | 26 ++ 7 files changed, 500 insertions(+), 452 deletions(-) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 README.md delete mode 100644 fast-reindex.py create mode 100644 fast_elasticsearch_reindex/__main__.py create mode 100644 fast_elasticsearch_reindex/reindexer.py create mode 100644 setup.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..614e141 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +fast_elasticsearch_reindex.log diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e8ab0c8 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.8-slim + +WORKDIR /app + +COPY setup.py . +COPY fast_elasticsearch_reindex fast_elasticsearch_reindex +RUN pip install . + +ENTRYPOINT ["python", "-m", "fast_elasticsearch_reindex"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..f726c08 --- /dev/null +++ b/README.md @@ -0,0 +1,54 @@ +# Fast Elasticsearch Reindex + +A Python based alternative to Elasticsearch Reindex API with multiprocessing +support. Since Elasticsearch Reindex API doesn't support slicing when reindexing +from a remote cluster, the entire process can take many hours or even days, +depending on the cluster size. Based on [Sliced +Scroll](https://www.elastic.co/guide/en/elasticsearch/reference/master/paginate-search-results.html#slice-scroll) +and [Bulk +requests](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html), +this utility can be used as a faster alternative. + +![fast_elasticsearch_reindex](https://i.ibb.co/Z6QKybN/fast-elasticsearch-reindex.gif) + +## Usage + +``` +usage: fast_elasticsearch_reindex [-h] [--src-hosts [SRC_HOSTS [SRC_HOSTS ...]]] [--dest-hosts [DEST_HOSTS [DEST_HOSTS ...]]] + [--query QUERY] [--workers WORKERS] [--size SIZE] [--scroll SCROLL] [--slice-field SLICE_FIELD] + [--indices [INDICES [INDICES ...]]] + +optional arguments: + -h, --help show this help message and exit + --src-hosts [SRC_HOSTS [SRC_HOSTS ...]] + Source Elasticsearch hosts to reindex from (default: ['127.0.0.1:9200']) + --dest-hosts [DEST_HOSTS [DEST_HOSTS ...]] + Destination Elasticsearch hosts to reindex to (default: ['127.0.0.1:9201']) + --query QUERY Search query (default: {}) + --workers WORKERS Number of parallel workers (default: 8) + --size SIZE Search request size (default: 2000) + --scroll SCROLL Scroll request duration (default: 5m) + --slice-field SLICE_FIELD + Field to slice by (default: None) + --indices [INDICES [INDICES ...]] + Indices to reindex (default: None) +``` + +## Installation + +Pip: +``` +pip install fast_elasticsearch_reindex +``` + +Local: +``` +$ pip install . +$ python -m fast_elasticsearch_reindex --help +``` + +Docker: +``` +$ docker build -t fast_elasticsearch_reindex . +$ docker run fast_elasticsearch_reindex --help +``` diff --git a/fast-reindex.py b/fast-reindex.py deleted file mode 100644 index 8f94095..0000000 --- a/fast-reindex.py +++ /dev/null @@ -1,452 +0,0 @@ -import argparse -import asyncio -import ciso8601 -import datetime -import elasticsearch -import elasticsearch_async -import functools -import itertools -import json -import logging -import multiprocessing -import orjson -import tenacity -import tqdm - - -tenacity_logger = logging.getLogger('tenacity') - - -class ORJsonSerializer( - elasticsearch.JSONSerializer, -): - def dumps( - self, - data, - ): - if isinstance(data, (str, bytes)): - return data - - return orjson.dumps(data).decode() - - def loads( - self, - data, - ): - return orjson.loads(data) - - -def main(): - try: - args = parse_args() - except InvalidArg as exception: - print(f'Error: {exception}') - return - - logging.basicConfig( - filename='fast-reindex.log', - level=logging.INFO, - format='[%(asctime)s|%(name)s|%(levelname).4s] [PID %(process)d] %(message)s', - ) - - start( - args=args, - ) - - -def start( - args, -): - total = count( - host=args.src_hosts, - indices=args.indices, - query=args.query, - ) - - if total: - logging.info(f'Found {total} documents') - else: - logging.error('No documents were found, exiting.') - return - - manager = multiprocessing.Manager() - tqdm_queue = manager.Queue() - - progress_process = multiprocessing.Process( - target=progress_queue, - args=( - tqdm_queue, - total, - ), - ) - - progress_process.start() - - with multiprocessing.Pool(args.max_workers) as pool: - slices = list( - itertools.zip_longest( - *(iter(range(args.max_slices)),) * int(args.max_slices/args.max_workers) - ), - ) - - print(f'Slice Distribution: {slices}') - - func = functools.partial( - reindex, - query=args.query, - max_slices=args.max_slices, - src_hosts=args.src_hosts, - dest_hosts=args.dest_hosts, - indices=args.indices, - size=args.size, - scroll=args.scroll, - tqdm_queue=tqdm_queue, - ) - - logging.info(f'Starting {len(slices)} workers') - - pool.map( - func, - slices, - ) - - logging.info('DONE.') - - -def count( - host, - indices, - query, -): - logging.info(f'Counting total documents of {indices} with {query}') - - client = elasticsearch.Elasticsearch( - hosts=host, - timeout=120, - serializer=ORJsonSerializer(), - ) - - return client.count( - index=indices, - body=query, - )['count'] - - -def progress_queue( - queue, - total, -): - last_speed_log = datetime.datetime.now() - created = 0 - errors = 0 - skipped = 0 - - progress_bar = tqdm.tqdm( - unit='doc', - mininterval=0.5, - total=total, - ) - - while True: - try: - stats = queue.get() - except EOFError: - return - - created += stats['created'] - errors += stats['errors'] - skipped += stats['skipped'] - total = stats['created'] + stats['errors'] + stats['skipped'] - - progress_bar.update(total) - - progress_bar.set_postfix( - created=created, - errors=errors, - skipped=skipped, - ) - - now = datetime.datetime.now() - - if now - last_speed_log >= datetime.timedelta(seconds=5): - logging.info(progress_bar) - - last_speed_log = now - - -def reindex( - slices, - query, - max_slices, - src_hosts, - dest_hosts, - indices, - size, - scroll, - tqdm_queue, -): - reindexer = Reindexer( - query=query, - src_hosts=src_hosts, - dest_hosts=dest_hosts, - indices=indices, - slices=slices, - max_slices=max_slices, - size=size, - scroll=scroll, - tqdm_queue=tqdm_queue, - ) - - try: - asyncio.get_event_loop().run_until_complete(reindexer.start()) - finally: - asyncio.get_event_loop().run_until_complete(reindexer.close()) - - -def parse_args(): - parser = argparse.ArgumentParser() - - parser.add_argument( - '--src-hosts', - nargs='*', - default=[ - ], - ) - - parser.add_argument( - '--dest-hosts', - nargs='*', - default=[ - ], - ) - - parser.add_argument( - '--query', - default='{"query":{"range":{"created_date":{"gte":"now-1w", "lte": "now"}}}}', - ) - - parser.add_argument( - '--max-slices', - type=int, - default=8, - ) - - parser.add_argument( - '--max-workers', - type=int, - default=8, - ) - - parser.add_argument( - '--size', - type=int, - default=2000, - ) - - parser.add_argument( - '--scroll', - default='5m', - ) - - parser.add_argument( - '--indices', - nargs='*', - default=[ - 'domains', - 'subdomains', - ], - ) - - args = parser.parse_args() - - args.query = json.loads(args.query) - - - if args.max_slices < args.max_workers: - raise InvalidArg('Max slices has to be greater than or equals to max workers') - - return args - - -class Reindexer: - def __init__( - self, - query, - src_hosts, - dest_hosts, - indices, - max_slices, - size, - scroll, - slices=None, - tqdm_queue=None, - ): - self.src_client = elasticsearch_async.AsyncElasticsearch( - hosts=src_hosts, - timeout=120, - dead_timeout=0, - timeout_cutoff=0, - serializer=ORJsonSerializer(), - ) - - self.dest_client = elasticsearch_async.AsyncElasticsearch( - hosts=dest_hosts, - timeout=120, - dead_timeout=0, - timeout_cutoff=0, - serializer=ORJsonSerializer(), - ) - - self.query = query - self.indices = indices - self.slices = slices - self.max_slices = max_slices - self.size = size - self.scroll = scroll - self.tqdm_queue = tqdm_queue - - async def close( - self, - ): - await self.dest_client.transport.close() - await self.src_client.transport.close() - - async def start( - self, - ): - logging.info('Starting worker') - - tasks = [] - - loop = asyncio.get_event_loop() - - for index in self.indices: - tasks += [ - self.reindex( - index=index, - slice_id=i, - ) for i in self.slices - ] - - await asyncio.gather(*tasks) - - logging.info('Worker done') - - async def reindex( - self, - index, - slice_id, - ): - logging.info(f'Indexing {index} (slice ID {slice_id})') - - if self.max_slices > 1: - _slice = { - 'slice': { - 'field': 'created_date', - 'id': slice_id, - 'max': self.max_slices, - }, - } - else: - _slice = {} - - for attempt in tenacity.Retrying( - wait=tenacity.wait_exponential(multiplier=1, min=1, max=3), - before_sleep=tenacity.before_sleep_log(tenacity_logger, logging.WARN) - ): - with attempt: - scroll_response = await self.src_client.search( - index=index, - scroll=self.scroll, - size=self.size, - body={ - **_slice, - **self.query, - }, - ) - - queue = scroll_response['hits']['hits'] - - while queue: - created = 0 - skipped = 0 - errors = 0 - body = [] - - for hit in queue: - body += [ - { - 'create': { - '_index': index, - '_id': hit['_id'], - }, - }, - hit['_source'], - ] - - for attempt in tenacity.Retrying( - wait=tenacity.wait_exponential(multiplier=1, min=1, max=3), - before_sleep=tenacity.before_sleep_log(tenacity_logger, logging.WARN) - ): - with attempt: - bulk_response = await self.dest_client.bulk( - body=body, - ) - - too_many_requests = False - - retry_queue = [] - for i, item in enumerate(bulk_response['items']): - if 'error' in item['create']: - if item['create']['status'] == 429: - too_many_requests = True - - print('Too many requests', item, queue[i]) - retry_queue.append(queue[i]) - elif item['create']['error']['type'] == 'version_conflict_engine_exception': - skipped += 1 - else: - logging.error(f'Failed to create: {item}') - - errors += 1 - else: - created += 1 - - self.tqdm_queue.put({ - 'created': created, - 'skipped': skipped, - 'errors': errors, - }) - - if too_many_requests: - logging.warning('Too many requests') - - await asyncio.sleep(0.250) - - if retry_queue: - queue = retry_queue - - continue - - for attempt in tenacity.Retrying( - wait=tenacity.wait_exponential(multiplier=1, min=1, max=3), - before_sleep=tenacity.before_sleep_log(tenacity_logger, logging.WARN) - ): - with attempt: - scroll_response = await self.src_client.scroll( - body={ - 'scroll_id': scroll_response['_scroll_id'], - 'scroll': self.scroll, - }, - ) - - queue = scroll_response['hits']['hits'] - - logging.info(f'Done indexing {index} (slice ID {slice_id})') - - -class InvalidArg(Exception): pass - - -if __name__ == '__main__': - main() diff --git a/fast_elasticsearch_reindex/__main__.py b/fast_elasticsearch_reindex/__main__.py new file mode 100644 index 0000000..d9310bd --- /dev/null +++ b/fast_elasticsearch_reindex/__main__.py @@ -0,0 +1,196 @@ +import argparse +import functools +import json +import logging +import multiprocessing +import tqdm + +from . import reindexer + + +def main(): + args = parse_args() + + logging.basicConfig( + filename='fast_elasticsearch_reindex.log', + level=logging.INFO, + format='[%(asctime)s|%(name)s|%(levelname).4s] [PID %(process)d] %(message)s', + ) + + start( + args=args, + ) + + +def parse_args(): + parser = argparse.ArgumentParser( + prog='fast_elasticsearch_reindex', + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + '--src-hosts', + help='Source Elasticsearch hosts to reindex from', + nargs='*', + default=['127.0.0.1:9200'], + ) + + parser.add_argument( + '--dest-hosts', + help='Destination Elasticsearch hosts to reindex to', + nargs='*', + default=['127.0.0.1:9201'], + ) + + parser.add_argument( + '--query', + help='Search query', + default='{}', + ) + + parser.add_argument( + '--workers', + help='Number of parallel workers', + type=int, + default=8, + ) + + parser.add_argument( + '--size', + help='Search request size', + type=int, + default=2000, + ) + + parser.add_argument( + '--scroll', + help='Scroll request duration', + default='5m', + ) + + parser.add_argument( + '--slice-field', + help='Field to slice by', + ) + + parser.add_argument( + '--indices', + help='Indices to reindex', + nargs='*', + ) + + args = parser.parse_args() + + args.query = json.loads(args.query) + + return args + + +def start( + args, +): + total = reindexer.count( + host=args.src_hosts, + indices=args.indices, + query=args.query, + ) + + if not total: + print('No documents were found, exiting.') + + return + + options = reindexer.Options( + src_hosts=args.src_hosts, + dest_hosts=args.dest_hosts, + indices=args.indices, + query=args.query, + workers=args.workers, + slice_field=args.slice_field, + size=args.size, + scroll=args.scroll, + ) + + manager = multiprocessing.Manager() + progress_queue = manager.Queue() + + progress_process = multiprocessing.Process( + target=update_progress_bar, + args=( + progress_queue, + total, + ), + ) + + progress_process.start() + + with multiprocessing.Pool(args.workers) as pool: + logging.info(f'Starting {args.workers} workers') + + func = functools.partial( + reindex, + options=options, + progress_queue=progress_queue, + ) + + pool.map( + func, + range(args.workers), + ) + + logging.info('DONE.') + + +def reindex( + worker_id: int, + options: reindexer.Options, + progress_queue, +): + reindexer_instance = reindexer.Reindexer( + worker_id=worker_id, + options=options, + progress_queue=progress_queue, + ) + + for index in options.indices: + reindexer_instance.reindex( + index=index, + ) + + +def update_progress_bar( + queue, + total, +): + progress_bar = tqdm.tqdm( + unit='doc', + mininterval=0.5, + total=total, + ) + + created = 0 + errors = 0 + skipped = 0 + + while True: + try: + stats = queue.get() + except EOFError: + return + + created += stats.created + errors += stats.errors + skipped += stats.skipped + total = stats.created + stats.errors + stats.skipped + + progress_bar.update(total) + + progress_bar.set_postfix( + created=created, + errors=errors, + skipped=skipped, + ) + + +if __name__ == '__main__': + main() diff --git a/fast_elasticsearch_reindex/reindexer.py b/fast_elasticsearch_reindex/reindexer.py new file mode 100644 index 0000000..cd5176f --- /dev/null +++ b/fast_elasticsearch_reindex/reindexer.py @@ -0,0 +1,213 @@ +import dataclasses +import elasticsearch +import logging +import orjson +import tenacity +import time +import typing + + +tenacity_logger = logging.getLogger('tenacity') + + +def count( + host: str, + indices: typing.List[str], + query, +): + client = elasticsearch.Elasticsearch( + hosts=host, + timeout=120, + ) + + return client.count( + index=indices, + body=query, + )['count'] + + +@dataclasses.dataclass +class Options: + src_hosts: typing.List[str] + dest_hosts: typing.List[str] + indices: typing.List[str] + query: dict + workers: int + slice_field: str + size: int + scroll: str + + +@dataclasses.dataclass +class ProgressUpdate: + created: int + errors: int + skipped: int + + +class Reindexer: + def __init__( + self, + worker_id: int, + options: Options, + progress_queue=None, + ): + self.src_client = elasticsearch.Elasticsearch( + hosts=options.src_hosts, + timeout=120, + dead_timeout=0, + timeout_cutoff=0, + serializer=ORJsonSerializer(), + ) + + self.dest_client = elasticsearch.Elasticsearch( + hosts=options.dest_hosts, + timeout=120, + dead_timeout=0, + timeout_cutoff=0, + serializer=ORJsonSerializer(), + ) + + self.worker_id = worker_id + self.options = options + self.progress_queue = progress_queue + + def __del__( + self, + ): + self.dest_client.transport.close() + self.src_client.transport.close() + + def reindex( + self, + index: str, + ): + logging.info(f'Indexing {index} (worker ID {self.worker_id})') + + if self.options.workers > 1: + _slice = { + 'slice': { + 'id': self.worker_id, + 'max': self.options.workers, + }, + } + + if self.options.slice_field: + _slice['field'] = self.options.slice_field + else: + _slice = {} + + for attempt in tenacity.Retrying( + wait=tenacity.wait_exponential(multiplier=1, min=1, max=3), + before_sleep=tenacity.before_sleep_log(tenacity_logger, logging.WARN) + ): + with attempt: + scroll_response = self.src_client.search( + index=index, + scroll=self.options.scroll, + size=self.options.size, + body={ + **_slice, + **self.options.query, + }, + ) + + queue = scroll_response['hits']['hits'] + + while queue: + created = 0 + skipped = 0 + errors = 0 + body = [] + + for hit in queue: + body += [ + { + 'create': { + '_index': hit['_index'], + '_id': hit['_id'], + }, + }, + hit['_source'], + ] + + for attempt in tenacity.Retrying( + wait=tenacity.wait_exponential(multiplier=1, min=1, max=3), + before_sleep=tenacity.before_sleep_log(tenacity_logger, logging.WARN) + ): + with attempt: + bulk_response = self.dest_client.bulk( + body=body, + ) + + too_many_requests = False + + retry_queue = [] + for i, item in enumerate(bulk_response['items']): + if 'error' in item['create']: + if item['create']['status'] == 429: + too_many_requests = True + + print('Too many requests', item, queue[i]) + retry_queue.append(queue[i]) + elif item['create']['error']['type'] == 'version_conflict_engine_exception': + skipped += 1 + else: + logging.error(f'Failed to create: {item}') + + errors += 1 + else: + created += 1 + + self.progress_queue.put( + ProgressUpdate( + created=created, + skipped=skipped, + errors=errors, + ), + ) + + if too_many_requests: + logging.warning('Too many requests') + + time.sleep(0.250) + + if retry_queue: + queue = retry_queue + + continue + + for attempt in tenacity.Retrying( + wait=tenacity.wait_exponential(multiplier=1, min=1, max=3), + before_sleep=tenacity.before_sleep_log(tenacity_logger, logging.WARN) + ): + with attempt: + scroll_response = self.src_client.scroll( + body={ + 'scroll_id': scroll_response['_scroll_id'], + 'scroll': self.options.scroll, + }, + ) + + queue = scroll_response['hits']['hits'] + + logging.info(f'Done indexing {index} (slice ID {self.worker_id})') + + +class ORJsonSerializer( + elasticsearch.JSONSerializer, +): + def dumps( + self, + data, + ): + if isinstance(data, (str, bytes)): + return data + + return orjson.dumps(data).decode() + + def loads( + self, + data, + ): + return orjson.loads(data) diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..e759487 --- /dev/null +++ b/setup.py @@ -0,0 +1,26 @@ +import setuptools + + +setuptools.setup( + name='fast_elasticsearch_reindex', + description="A Python based alternative to Elasticsearch Reindex API with multiprocessing support", + version='1.0.0', + author="Adir Haleli", + author_email="adir544@gmail.com", + long_description=open('README.md').read(), + long_description_content_type="text/markdown", + packages=setuptools.find_packages(), + install_requires=[ + 'elasticsearch', + 'orjson', + 'tenacity', + 'tqdm', + ], + url="https://github.com/Intsights/fast_elasticsearch_reindex", + classifiers=[ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + ], + python_requires='>=3.4', +)