-
Notifications
You must be signed in to change notification settings - Fork 0
/
flixbus_stations_loader.py
61 lines (51 loc) · 2.05 KB
/
flixbus_stations_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
"""
Load all cities available in flixbus
"""
import logging
import uuid
import nest_asyncio
from pipelines.flixbus.bus_stations_pipeline import (
FlixbusBusStationsDataLoader,
FlixbusBusStationsDataProcessor,
)
from scrapers.flixbus.bus_stations_scraper import (
FlixbusBusStationsParser,
FlixbusBusStationsScraper,
)
from settings import APP_NAME
nest_asyncio.apply()
# patches asyncio to allow nested use of asyncio.run
# and loop.run_until_complete
logger = logging.getLogger(APP_NAME)
logger.setLevel(logging.DEBUG)
async def load_flixbus_cities(region: str = "EU"):
"""
Load Flixbus cities data for a given region.
This function orchestrates the process of loading
Flixbus cities data for the specified region.
It involves scraping the data from the Flixbus website,
parsing the scraped data, processing the parsed data,
and finally, loading the processed data into a data loader.
:param region: (str, optional) The region for which cities data should be loaded
(default is "EU").
"""
trace_uuid = str(uuid.uuid4())
logger.info(f"[{trace_uuid}] Trying to update Flixbus bus stations graph.")
flixbus_stations_scraper = FlixbusBusStationsScraper(region=region)
scraped_stations = flixbus_stations_scraper.get_data(method="POST")
if scraped_stations:
logger.info(f"[{trace_uuid}] Successfully scraped {scraped_stations[0]['hits']['total']}.")
flixbus_stations_parser = FlixbusBusStationsParser(
region=region, scraped_data=scraped_stations
)
parsed_flixbus_stations = flixbus_stations_parser.parse_data()
flixbus_data_processor = FlixbusBusStationsDataProcessor(
parsed_data=parsed_flixbus_stations
)
processed_stations = flixbus_data_processor.process_items()
flixbus_stations_loader = FlixbusBusStationsDataLoader(
processed_data=processed_stations, region=region
)
await flixbus_stations_loader.load_items()
else:
logger.error(f"[{trace_uuid}] No new bus stations scraped.")