Skip to content

Commit

Permalink
cập nhật logic xử lý dữ liệu BCTC VCI
Browse files Browse the repository at this point in the history
  • Loading branch information
thinh-vu committed Nov 1, 2024
1 parent 0327ae7 commit 6281d6e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 47 deletions.
Binary file modified .DS_Store
Binary file not shown.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ vnstock3/.DS_Store
docs/.DS_Store
dev/check_MSN_quarter_reports_df.xlsx
dev/MSN_quarter_reports_df.xlsx
dev/vci_financial_test_notebook_Nov_1.ipynb
vnstock3/.DS_Store
.DS_Store
73 changes: 26 additions & 47 deletions vnstock3/explorer/vci/financial.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,12 @@ def _ratio_mapping (self, ratio_df:pd.DataFrame, lang:Optional[str]='en', show_l

if lang == 'vi':
ratio_df = ratio_df.rename(columns={'ticker': 'CP', 'yearReport': 'Năm', 'lengthReport': 'Kỳ'})
index_part = ratio_df[['CP', 'Năm', 'Kỳ']]
index_cols = ['CP', 'Năm', 'Kỳ']
index_part = ratio_df[index_cols]
target_col_name = 'name'
elif lang == 'en':
index_part = ratio_df[['ticker', 'yearReport', 'lengthReport']]
index_cols = ['ticker', 'yearReport', 'lengthReport']
index_part = ratio_df[index_cols]
target_col_name = 'en_name'

# Create a dictionary to map field_name to report type
Expand All @@ -200,16 +202,10 @@ def _ratio_mapping (self, ratio_df:pd.DataFrame, lang:Optional[str]='en', show_l
mapping_specific = pd.DataFrame()
mapping_ct = mapping_df[mapping_df['com_type_code'] == 'CT']
all_columns_mapping = pd.concat([mapping_specific, mapping_ct]).drop_duplicates(subset='field_name', keep='first')
all_columns_mapping = self.duplicated_columns_handling(all_columns_mapping, target_col_name=target_col_name)

# remove all values that com_type_code is not 'CT' or self.com_type_code
all_columns_mapping = all_columns_mapping[all_columns_mapping['com_type_code'].isin(['CT', self.com_type_code])].copy()

# # Filter the mapping DataFrame based on company type code. Split mapping into two parts: 'CT' and company-specific mapping
# mapping_specific = mapping_df[mapping_df['com_type_code'] == self.com_type_code]
# mapping_ct = mapping_df[mapping_df['com_type_code'] != self.com_type_code]
# all_columns_mapping = pd.concat([mapping_specific, mapping_ct]).drop_duplicates(subset='field_name', keep='first')

original_columns = ratio_df.columns
# There are many columns in ratio_df that are not in mapping_df, they are not tied to the company specific mapping or the 'CT' mapping
orphan_columns = [col for col in original_columns if col not in mapping_df['field_name'].values and col not in index_part.columns]
Expand All @@ -219,46 +215,18 @@ def _ratio_mapping (self, ratio_df:pd.DataFrame, lang:Optional[str]='en', show_l
columns_translation = all_columns_mapping.set_index('field_name')[target_col_name].to_dict()
# Create a dictionary to map field_name to order
column_order = all_columns_mapping.set_index(target_col_name)['order'].to_dict()

# Drop the orphan columns which are not in the mapping DataFrame
ratio_df = ratio_df.drop(columns=orphan_columns)
# apply sorting to the columns of ratio_df by the order in the column_order dictionary
ratio_df = ratio_df[sorted(ratio_df.columns, key=lambda x: column_order.get(x, 0))]

translated_ratio_df = ratio_df.copy()
# look up the value in translated_ratio_df and replace it with the value in column_translation
translated_ratio_df = translated_ratio_df.rename(columns=columns_translation)
# Remove orphan columns
translated_ratio_df = translated_ratio_df.drop(columns=orphan_columns)

# # Get the list of duplicated columns names in translated_ratio_df.columns
# duplicated_columns = translated_ratio_df.columns[translated_ratio_df.columns.duplicated()].tolist()

type_mapping = dict(zip(mapping_df[target_col_name], mapping_df['type']))

# Organize columns in ratio_df into different reports based on type
reports = {}
for field, report_type in type_mapping.items():
if report_type not in reports:
reports[report_type] = [field]
else:
reports[report_type].append(field)

# Rename columns in ratio_df based on mapping_df's 'name'
name_mapping = dict(zip(mapping_df['field_name'], mapping_df[target_col_name]))
if show_log:
logger.debug(f"Name mapping: {name_mapping}")
ratio_df.rename(columns=name_mapping, inplace=True)
# Reorder columns based on 'order' in mapping_df
ratio_df = ratio_df[sorted(ratio_df.columns, key=lambda x: int(column_order.get(x, 0)))]

## Handle duplicate columns
# ratio_df = self.handle_duplicate_columns(original_columns, ratio_df)

# Create DataFrames for each report type with exception handling
report_type_field_dict = all_columns_mapping.groupby('type')['field_name'].apply(list).to_dict()

# Create DataFrames for each report type, using columns name as code names without translation
report_dfs = {}
for report_type, columns in reports.items():
try:
# Filter the DataFrame only for columns that exist in ratio_df
filtered_columns = [col for col in columns if col in ratio_df.columns]
report_dfs[report_type] = ratio_df[filtered_columns]
except KeyError as e:
print(f"Failed to create DataFrame for {report_type} due to missing columns: {e}")
for report_type, fields in report_type_field_dict.items():
report_dfs[report_type] = ratio_df[index_cols + fields]

# Define the primary report types
primary_reports = [
Expand All @@ -271,6 +239,17 @@ def _ratio_mapping (self, ratio_df:pd.DataFrame, lang:Optional[str]='en', show_l
primary_dfs = {key: report_dfs[key] for key in primary_reports}
other_reports = {key: report_dfs[key] for key in report_dfs if key not in primary_reports}


# Create a dictionary to map field_name to report type
name_mapping = dict(zip(mapping_df['field_name'], mapping_df[target_col_name]))

# Translate the column names
for key in primary_dfs:
primary_dfs[key].columns = [name_mapping.get(col, col) for col in primary_dfs[key].columns]

for key in other_reports:
other_reports[key].columns = [name_mapping.get(col, col) for col in other_reports[key].columns]

# Merge all other reports into a single DataFrame with MultiIndex columns
merged_other_reports = pd.concat(other_reports.values(), axis=1, keys=other_reports.keys())

Expand All @@ -290,7 +269,7 @@ def multi_level_columns_translate(col, translation_dict):
if lang == 'en':
# Apply the columns_translate function to all columns
merged_other_reports.columns = pd.MultiIndex.from_tuples(
[multi_level_columns_translate(col, columns_translate) for col in merged_other_reports.columns]
[multi_level_columns_translate(col, columns_translation) for col in merged_other_reports.columns]
)

return primary_dfs, merged_other_reports
Expand Down

0 comments on commit 6281d6e

Please sign in to comment.