Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes for cbs process files to support importing old data #16

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ __pycache__
.airflow.env
.etl.env
.vscode
.ipynb_checkpoints
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,9 @@ For more advanced documentation see the [docs](docs) directory.
* Every [release](https://github.com/hasadna/anyway-etl/releases) causes deployment
to the Kubernetes cluster's `anyway` environment (the production environment)

## Testing on dev environment before merging

To test changes on dev environment before merging them to main branch -
edit the `airflow-scheduler` deployment on `anyway-dev` namespace and set
`ANYWAY_ETL_BRANCH` env var to the name of the branch with changes you want
to test. Once testing is done, revert back to `main`.
5 changes: 2 additions & 3 deletions anyway_etl/cbs/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ def import_emails():


@cbs.command()
@click.option('--limit-rows')
def process_files(**kwargs):
def process_files():
"""Extract and process the cbs files"""
from . import process_files
process_files.main(**kwargs)
process_files.main()


@cbs.command()
Expand Down
3 changes: 1 addition & 2 deletions anyway_etl/cbs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
CBS_DATA_ROOT_PATH = os.path.join(ANYWAY_ETL_DATA_ROOT_PATH, 'cbs')
CBS_EMAILS_DATA_ROOT_PATH = os.path.join(CBS_DATA_ROOT_PATH, 'emails')
CBS_FILES_ROOT_PATH = os.path.join(CBS_DATA_ROOT_PATH, 'files')
CBS_PROCESSED_FILES_ROOT_PATH = os.path.join(CBS_DATA_ROOT_PATH, 'processed_files')
CBS_YEARLY_DIRECTORIES_ROOT_PATH = os.path.join(CBS_DATA_ROOT_PATH, 'yearly')
CBS_ACCIDENT_MARKERS_ROOT_PATH = os.path.join(CBS_DATA_ROOT_PATH, 'accident_markers')
CBS_INVOLVED_ROOT_PATH = os.path.join(CBS_DATA_ROOT_PATH, 'involved')
Expand All @@ -15,5 +16,3 @@
IMAP_MAIL_USER = os.environ.get('IMAP_MAIL_USER')
IMAP_MAIL_PASSWORD = os.environ.get('IMAP_MAIL_PASSWORD')
IMAP_MAIL_HOST = os.environ.get('IMAP_MAIL_HOST', "imap.gmail.com")

PREPROCESS_FILES_DEFAULT_LIMIT_ROWS = int(os.environ.get('PREPROCESS_FILES_DEFAULT_LIMIT_ROWS', '2'))
2 changes: 2 additions & 0 deletions anyway_etl/cbs/parse_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def common_get_data_iterator(load_start_year, stats, get_data_iterator):
'accidents_type_{}'.format(provider_code),
str(year)
)
print("cbs_files_dir={}".format(cbs_files_dir))
files_from_cbs = get_files(cbs_files_dir)
if len(files_from_cbs) == 0:
stats['invalid_directories_without_cbs_files'] += 1
Expand All @@ -46,6 +47,7 @@ def common_get_data_iterator(load_start_year, stats, get_data_iterator):

def common_main(load_start_year, output_path, get_data_iterator):
load_start_year = int(load_start_year) if load_start_year else datetime.datetime.now().year - 1
print("load_start_year={} output_path={}".format(load_start_year, output_path))
stats = defaultdict(int)
_, df_stats = DF.Flow(
common_get_data_iterator(load_start_year, stats, get_data_iterator),
Expand Down
75 changes: 35 additions & 40 deletions anyway_etl/cbs/process_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

from .config import (
CBS_EMAILS_DATA_ROOT_PATH, CBS_FILES_ROOT_PATH,
PREPROCESS_FILES_DEFAULT_LIMIT_ROWS,
CBS_YEARLY_DIRECTORIES_ROOT_PATH
CBS_YEARLY_DIRECTORIES_ROOT_PATH,
CBS_PROCESSED_FILES_ROOT_PATH
)


Expand All @@ -26,21 +26,11 @@
}


def limit_last_rows(limit_rows):

def _iterator(rows):
for i, row in enumerate(rows):
if i < limit_rows:
yield row

return _iterator


def extract_zip_files(row):
zip_filepath = os.path.join(CBS_FILES_ROOT_PATH, row['filename'])
row['extracted_path'] = row['filename'].replace('.zip', '')
extracted_path = os.path.join(CBS_FILES_ROOT_PATH, row['extracted_path'])
print("Extracting {} -> {}".format(zip_filepath, extracted_path))
# print("Extracting {} -> {}".format(zip_filepath, extracted_path))
shutil.rmtree(extracted_path, ignore_errors=True)
os.makedirs(extracted_path)
with zipfile.ZipFile(zip_filepath, "r") as zf:
Expand All @@ -49,7 +39,7 @@ def extract_zip_files(row):

def update_cbs_files_names(row):
extracted_path = os.path.join(CBS_FILES_ROOT_PATH, row['extracted_path'])
print('updating cbs file names {}'.format(extracted_path))
# print('updating cbs file names {}'.format(extracted_path))
files = sorted([path for path in os.listdir(extracted_path)])
for file in files:
file_path = os.path.join(extracted_path, file)
Expand All @@ -74,41 +64,46 @@ def get_provider_code_and_year(row):
row['year'] = int(year)


def save_to_directory_structure(row):
extracted_path = os.path.join(CBS_FILES_ROOT_PATH, row['extracted_path'])
provider_code = row['provider_code']
year = row['year']
base_file_path = os.path.join(
CBS_YEARLY_DIRECTORIES_ROOT_PATH,
'accidents_type_{}'.format(provider_code),
str(year)
)
shutil.rmtree(base_file_path, ignore_errors=True)
os.makedirs(base_file_path)
row['num_files'] = 0
for file in os.scandir(extracted_path):
row['num_files'] += 1
target_file_path = os.path.join(base_file_path, os.path.basename(file.path))
shutil.copy(file.path, target_file_path)



def main(limit_rows=None):
limit_rows = int(limit_rows) if limit_rows else PREPROCESS_FILES_DEFAULT_LIMIT_ROWS
stats = defaultdict
def save_to_directory_structure(rows):
updated_year_provider_codes = set()
for row in rows:
row['num_saved_files'] = 0
provider_code = row['provider_code']
year = row['year']
if '{}|{}'.format(year, provider_code) not in updated_year_provider_codes:
updated_year_provider_codes.add('{}|{}'.format(year, provider_code))
extracted_path = os.path.join(CBS_FILES_ROOT_PATH, row['extracted_path'])
base_file_path = os.path.join(
CBS_YEARLY_DIRECTORIES_ROOT_PATH,
'accidents_type_{}'.format(provider_code),
str(year)
)
print('Saving msgId {} mtime {} to {}'.format(row['msgId'], row['mtime'], base_file_path))
shutil.rmtree(base_file_path, ignore_errors=True)
os.makedirs(base_file_path)
file: os.DirEntry
for file in os.scandir(extracted_path):
row['num_saved_files'] += 1
target_file_path = os.path.join(base_file_path, os.path.basename(file.path))
shutil.copy(file.path, target_file_path)
yield row


def main():
stats = defaultdict(int)
_, df_stats = DF.Flow(
DF.load(os.path.join(CBS_EMAILS_DATA_ROOT_PATH, 'datapackage.json')),
DF.sort_rows('mtime', reverse=True),
limit_last_rows(limit_rows),
DF.add_field('extracted_path', 'string'),
extract_zip_files,
update_cbs_files_names,
DF.add_field('provider_code', 'integer'),
DF.add_field('year', 'integer'),
get_provider_code_and_year,
DF.add_field('num_files', 'integer'),
DF.sort_rows('{year}|{provider_code}', reverse=True),
DF.add_field('num_saved_files', 'integer'),
save_to_directory_structure,
DF.dump_to_path(CBS_PROCESSED_FILES_ROOT_PATH),
DF.printer()
).process()
pprint(df_stats)
pprint(stats)
pprint(dict(stats))
Loading