forked from akankshadhar2/Company-10K-Analyzer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.py
293 lines (224 loc) · 13 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
import streamlit as st
import matplotlib.pyplot as plt
from tabulate import tabulate
import os
import re
from bs4 import BeautifulSoup
from sec_edgar_downloader import Downloader
import google.generativeai as genai
api_key = st.secrets["api_key"]
genai.configure(api_key=api_key)
# Function to download 10k filings
def download_10k_filings(ticker, after_date, before_date):
base_dir = os.getcwd()
target_dir = os.path.join(base_dir, "sec-edgar-filings", "10-K", ticker)
# Initialize the downloader
dl = Downloader("VIT", "[email protected]")
try:
# Download the 10-K filings for the given ticker within the date range
dl.get("10-K", ticker, after=after_date, before=before_date)
print(f"Downloading 10-K filings for {ticker} between {after_date} and {before_date}...")
print(f"Successfully downloaded 10-K filings for {ticker} between {after_date} and {before_date}")
return True # Return True if download is successful
except Exception as e:
print(f"Error downloading 10-K filings for {ticker}: {e}")
return False # Return False if download fails
def answer_question(prompt):
# Generate an answer using Gemini API based on the prompt
response = genai.generate_text(prompt=prompt)
# Extract the result from the response
result = response.result
return result
# Function to remove HTML tags and special characters from text content
def clean_text(text):
try:
# Specify the encoding explicitly when reading the file
cleaned_text = BeautifulSoup(text, 'html.parser').get_text()
except Exception as e:
# Handle parsing errors by trying a different parser
try:
cleaned_text = BeautifulSoup(text, 'lxml').get_text()
except Exception as e:
cleaned_text = '' # Return empty string if parsing fails
cleaned_text = re.sub(r'[^\w\s.,]', '', cleaned_text)
return cleaned_text.strip()
# Function to extract financial information from text
def extract_financial_info(text):
financial_data = {}
# Define regex patterns for common financial metrics
financial_patterns = {
'revenue': r'(?:total\s+)?revenue(?:\s+in\s+millions)?\s+([\d,.]+)',
'net_income': r'net\s+income(?:\s+in\s+millions)?\s+([\d,.]+)',
'total_assets': r'total\s+assets(?:\s+in\s+millions)?\s+([\d,.]+)',
'total_liabilities': r'total\s+liabilities(?:\s+in\s+millions)?\s+([\d,.]+)'
# Add more patterns for other metrics as needed
}
# Clean text content
cleaned_text = clean_text(text)
# Extract financial metrics using regex from cleaned text
for metric, pattern in financial_patterns.items():
match = re.search(pattern, cleaned_text, re.IGNORECASE)
if match:
try:
# Try to convert the matched value to float
financial_data[metric] = float(match.group(1).replace(',', ''))
except ValueError:
# Handle case where conversion to float fails
financial_data[metric] = None
else:
financial_data[metric] = None
return financial_data
# Function to analyze financial data and generate insights using Gemini API
def analyze_financial_data(financial_data):
insights = genai.generate_text(prompt=f"Analyze financial data: {financial_data}. Give insights based on the financial data.")
return insights
# Visualization function for financial metrics over years
def visualize_financial_metrics(ticker, processed_financial_data):
# Define the list of financial metrics to include in the visualization
financial_metrics = ['revenue', 'net_income', 'total_assets', 'total_liabilities']
# Check if the specified ticker exists in the processed financial data
if ticker in processed_financial_data:
years_data = processed_financial_data[ticker]
# Prepare data for plotting
years = sorted(years_data.keys())
num_metrics = len(financial_metrics)
# Initialize subplots for multiple metrics
fig, axes = plt.subplots(num_metrics, 3, figsize=(18, 12))
fig.suptitle(f"Financial Metrics Over Years for {ticker}", fontsize=16)
# Iterate over each financial metric and plot
for i, metric in enumerate(financial_metrics):
metric_values = []
for year in years:
financial_info = years_data[year]['financial_info']
metric_value = financial_info.get(metric, None)
metric_values.append(metric_value)
# Line plot
axes[i, 0].plot(years, metric_values, marker='o', linestyle='-', color='b')
axes[i, 0].set_ylabel(metric.capitalize())
axes[i, 0].set_xlabel("Year")
axes[i, 0].grid(True)
# Scatter plot (with random jitter for better visualization)
jitter_years = [year + 0.2 * (i - 1.5) for year in years]
axes[i, 1].scatter(jitter_years, metric_values, color='g', alpha=0.7)
axes[i, 1].set_ylabel(metric.capitalize())
axes[i, 1].set_xlabel("Year (with jitter)")
axes[i, 1].grid(True)
# Stacked bar chart (comparing assets and liabilities)
assets = [years_data[year]['financial_info'].get('total_assets', 0) or 0 for year in years]
liabilities = [years_data[year]['financial_info'].get('total_liabilities', 0) or 0 for year in years]
axes[i, 2].bar(years, assets, color='b', label='Total Assets')
axes[i, 2].bar(years, liabilities, color='r', label='Total Liabilities', bottom=assets)
axes[i, 2].set_ylabel("Amount")
axes[i, 2].set_xlabel("Year")
axes[i, 2].legend()
axes[i, 2].grid(True)
# Adjust layout and display plot
plt.tight_layout(rect=[0, 0, 1, 0.96]) # Adjust top margin for suptitle
st.pyplot(fig) # Display the plot in Streamlit
else:
st.write(f"No data found for ticker: {ticker}")
# Define the Streamlit app
def main():
st.title("Company 10K Analyzer")
# User input for company ticker
ticker = st.text_input("Enter company ticker: (Press Enter once done)")
# Date selection using date_input
after_date = st.text_input("Enter After Date (YYYY-MM-DD)", "1995-01-01")
before_date = st.text_input("Enter Before Date (YYYY-MM-DD)", "1998-01-01")
# Display the entered dates
st.write(f"You entered After Date: {after_date}")
st.write(f"You entered Before Date: {before_date}")
st.write(f"Download started for {ticker} from {after_date} to {before_date}")
# Download 10-K filings for the specified ticker and date range
if download_10k_filings(ticker, after_date, before_date):
st.write("Download successful!")
if ticker:
base_dir = os.getcwd()
target_dir = os.path.join(base_dir, "sec-edgar-filings")
# Specify the base directory where ticker folders are located
ticker_dir = os.path.join(target_dir,ticker)
processed_financial_data = {}
longdir=os.path.join(ticker_dir,'10-K')
st.write ("Extracting financial data for analysis and generating insights...")
# Loop through each company directory inside the specified ticker directory
for company_dir in os.listdir(longdir):
company_path = os.path.join(longdir, company_dir)
if os.path.isdir(company_path):
# Extract year from the company directory name (e.g., '0000320193-22-000108' => '2022')
year_match = re.search(r'-(\d{2})-', company_dir)
if year_match:
year_suffix = year_match.group(1) # Extract two-digit year suffix
if int(year_suffix) <= 23:
year = 2000 + int(year_suffix) # Convert to full four-digit year (e.g., 22 => 2022)
else:
year = 1900 + int(year_suffix) # Convert to full four-digit year for older files
# Find the full-submission.txt file (assuming it's the only txt file in the directory)
for file in os.listdir(company_path):
if file.endswith('.txt'):
with open(os.path.join(company_path, file), 'r', encoding='utf-8') as f:
text_content = f.read()
# Extract financial information
financial_info = extract_financial_info(text_content)
# Analyze financial data (using placeholder function)
financial_insights = analyze_financial_data(financial_info)
# Store processed data for the specified ticker and year
if ticker not in processed_financial_data:
processed_financial_data[ticker] = {}
processed_financial_data[ticker][year] = {
'financial_info': financial_info,
'financial_insights': financial_insights
}
break # Stop after reading the first .txt file
# Check if the specified ticker exists in the processed financial data
if ticker in processed_financial_data:
years_data = processed_financial_data[ticker]
# Prepare data for the table
table_data = []
financial_metrics = ['revenue', 'net_income', 'total_assets', 'total_liabilities']
# Prepare the header row for the table
header_row = ["Year"] + financial_metrics + ["Financial Insights"]
table_data.append(header_row)
# Iterate over each year and its corresponding data
for year, data in sorted(years_data.items()):
financial_info = data['financial_info']
financial_insights = data.get('financial_insights', None)
# Create a row for each year's financial data
row_data = [year]
# Append financial metrics to the row data
for metric in financial_metrics:
row_data.append(financial_info.get(metric, None))
# Append financial insights to the row data
if financial_insights:
result = financial_insights.result
row_data.append(result)
else:
row_data.append(None) # Use None if insights are not available
# Append the row data to the table data
table_data.append(row_data)
# Display the table using Streamlit
st.header(f"Financial Details for Ticker: {ticker}")
st.table(table_data)
else:
st.write(f"No data found for ticker: {ticker}")
st.header(f"Financial Visualisations for Ticker: {ticker}")
visualize_financial_metrics(ticker, processed_financial_data)
# Get the answer using the defined function
st.header("Interesting Observations in Financial Analysis")
answer_obs=answer_question(prompt=f"Is there any sharp decline or increase in any year between 1995-2023 in the financial metrics of {ticker}? What insight do we get from that? ")
st.write(answer_obs)
answer = answer_question(prompt = f"What insights do you get from the financial metrics in the 10k filings of {ticker} from 1995 to 2023. ")
st.header(f"Overall Insights and Analysis of {ticker}")
st.write(answer)
st.header("Ask a question")
# Ask for user input
prompt_ask = st.text_input("Enter your question related to the financial analysis")
# Check if the user has submitted a question
if prompt_ask:
# Simulate processing the answer based on the user's question
st.write(f"Processing answer for the question: {prompt_ask}")
answer_ask = answer_question(prompt_ask)
st.write(answer_ask)
else:
print(f"Failed to download 10-K filings for {ticker}")
if __name__ == "__main__":
main()