Skip to content

Commit

Permalink
feat: update code/sources (only sources/radio.yaml working)
Browse files Browse the repository at this point in the history
  • Loading branch information
jnk22 committed Jan 20, 2024
1 parent ca7b0c5 commit 8c3c3d2
Show file tree
Hide file tree
Showing 14 changed files with 3,332 additions and 2,799 deletions.
198 changes: 76 additions & 122 deletions kodinerds_iptv/check_availability.py
Original file line number Diff line number Diff line change
@@ -1,154 +1,108 @@
"""TODO."""

import asyncio
from collections import defaultdict
from collections.abc import Coroutine
from http import HTTPStatus
from itertools import chain
import functools
from collections.abc import Sequence
from pathlib import Path
from typing import Any, Protocol
from ssl import SSLError

from httpx import AsyncClient, AsyncHTTPTransport, Response
import aiometer
from httpx import URL, AsyncClient, AsyncHTTPTransport, Response, TransportError
from loguru import logger
from tabulate import tabulate

from .enums import ReportFormat, StreamState
from .io_utils import read_streams
from .stream import Stream, StreamCategory, StreamCheck


class ReportOutput(Protocol):
"""TODO."""

def write_header(self, state: StreamState) -> str:
"""TODO."""
...

def write_table(self, stream_checks: list[StreamCheck]) -> str:
"""TODO."""
...


def check_sources(
sources: list[Path],
output_dir: Path,
timeout: int,
retries: int,
report_format: ReportFormat,
) -> None:
"""TODO."""
asyncio.run(
check_availability(sources, output_dir, timeout, retries, report_format)
)
from .enums import StreamState
from .stream import Stream, StreamCheck


async def check_availability(
sources: list[Path],
streams: Sequence[Stream],
report_dir: Path,
*,
timeout: int,
retries: int,
report_format: ReportFormat,
max_retries: int,
max_parallel: int,
) -> None:
"""TODO."""
source_content: dict[str, list[StreamCategory]] = {}
for source in sources:
print(f"Reading source: {source}")
source_content[source.stem] = read_streams(source)
transport = AsyncHTTPTransport(retries=max_retries)
client = AsyncClient(transport=transport, follow_redirects=True, timeout=timeout)
params = functools.partial(__check_stream, client)

transport = AsyncHTTPTransport(retries=retries)
results: dict[str, list[StreamCheck]] = {}
async with aiometer.amap(params, streams, max_at_once=max_parallel) as results:
stream_checks = [
__stream_check_response(stream, response)
async for stream, response in results
]

async with AsyncClient(transport=transport) as client:
for source_name, stream_groups in source_content.items():
print(f"Checking streams from source '{source_name}'")
__write_results(stream_checks, report_dir / "report.txt")

streams = list(chain.from_iterable(sg.streams for sg in stream_groups))
tasks = [await check(stream, client, timeout) for stream in streams]
gather_results = await asyncio.gather(*tasks, return_exceptions=True)

results[source_name] = [
match_stream_check_response(*stream_result)
for stream_result in zip(streams, gather_results, strict=True)
]
async def __check_stream(
client: AsyncClient, stream: Stream
) -> tuple[Stream, Response | Exception | None]:
logger.debug(f"Checking stream: {stream.name}")
# TODO: Enable caching for already checked stream URLs.

file_extension = "md" if report_format == ReportFormat.MARKDOWN else "txt"
for source_name, stream_checks in results.items():
report_file = report_dir / f"{source_name}.{file_extension}"
write_results(stream_checks, report_file)
if stream.skip_check:
return stream, None

print("Finished")
try:
return stream, await client.send(client.build_request("HEAD", stream.url))
except (TransportError, SSLError) as e:
return stream, e


def match_stream_check_response(
def __stream_check_response(
stream: Stream, response: Response | Exception | None
) -> StreamCheck:
"""TODO."""
match response:
case None:
return StreamCheck(stream, StreamState.SKIPPED)

case Exception() as e:
return StreamCheck(stream, StreamState.ERROR, reason=str(e))

case _:
# TODO: Check for redirects.
# TODO: Match against more status codes explicitly.
status = response.status_code
check = (
StreamCheck(stream, StreamState.SUCCESS)
if status == HTTPStatus.OK
else StreamCheck(stream, StreamState.ERROR, reason=f"HTTP: {status}")
)

if check.state == StreamState.SUCCESS and stream.url.startswith("http://"):
# Stream URL is HTTP, but it works.
check.state = StreamState.WARNING
check.reason = "Try using https:// instead of http://"

return check


async def check(
stream: Stream, client: AsyncClient, timeout: int
) -> Coroutine[Any, Any, Response | None]:
"""TODO."""
return noop() if stream.skip_check else client.head(stream.url, timeout=timeout)
if response is None:
# Happens if 'stream.skip_check' is True, therefore no response.
return StreamCheck(stream, StreamState.SKIPPED)

if isinstance(response, Exception):
return StreamCheck(stream, StreamState.ERROR, reason=str(response))

async def noop() -> None:
"""No-op function."""
if response.is_error:
return StreamCheck(
stream, StreamState.ERROR, reason=f"HTTP status: {response.status_code}"
)

if response.is_redirect:
return StreamCheck(
stream, StreamState.WARNING, reason="Redirected to: {response.url}"
)

def write_results(results: list[StreamCheck], output_file: Path) -> None:
"""TODO."""
print(f"Writing file: {output_file}")
sorted_results: defaultdict[StreamState, list[StreamCheck]] = defaultdict(list)
for result in results:
sorted_results[result.state].append(result)

final_result: str = "\n\n".join(
f"{'='*29} Results for: {state} {'='*29}\n{result_lines(sorted_results[state])}"
for state in StreamState
if response.is_success and URL(stream.url).scheme == "http":
return StreamCheck(
stream, StreamState.WARNING, reason="Using HTTP instead of HTTPS"
)

if response.is_success:
return StreamCheck(stream, StreamState.SUCCESS)

# All other cases, including HTTP 1xx status codes.
return StreamCheck(
stream,
StreamState.UNKNOWN,
reason=f"Unknown (HTTP status: {response.status_code})",
)


def __write_results(results: Sequence[StreamCheck], output_file: Path) -> None:
logger.info(f"Writing report file: {output_file}")

output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.write_text(final_result)
output_file.write_text(__generate_result_lines(sorted(results)))


def result_lines(results: list[StreamCheck]) -> str:
"""TODO."""
return (
tabulate(
[
{
"name": res.stream.name,
"state": res.state.value,
"reason": res.reason,
}
for res in results
],
headers="keys",
tablefmt="github",
)
if results
else "Nothing here..."
)
def __generate_result_lines(stream_checks: Sequence[StreamCheck]) -> str:
data = [
{
"name": check.stream.name,
"source": check.stream.source_file,
"state": check.state.name,
"reason": check.reason,
}
for check in stream_checks
]

return tabulate(data, headers="keys")
2 changes: 1 addition & 1 deletion kodinerds_iptv/cli_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ def version_callback(*, version: bool) -> None:

from typer import Exit

print(f"Kodinerds IPTV {metadata.version(__package__)}")
print(f"Kodinerds IPTV CLI {metadata.version(__package__)}") # noqa: T201
raise Exit
17 changes: 7 additions & 10 deletions kodinerds_iptv/enums.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""TODO."""

from enum import Enum
from enum import Enum, IntEnum


class ListType(Enum):
Expand All @@ -11,27 +11,24 @@ class ListType(Enum):
CLEAN: Stream uses raw URL.
KODI: Stream has full Kodi compatibiliy with extended details.
PIPE: Stream is piped through FFmpeg with special header.
DASH: Stream has special header to be used with Kodi's adaptive inputstream.
"""

CLEAN = "clean"
KODI = "kodi"
PIPE = "pipe"
DASH = "dash"


class StreamState(Enum):
class StreamState(IntEnum):
"""TODO."""

SUCCESS = "success"
WARNING = "warning"
SKIPPED = "skipped"
ERROR = "error"
UNKNOWN = "unknown"
SUCCESS = 1
WARNING = 2
SKIPPED = 3
ERROR = 4
UNKNOWN = 5


class ReportFormat(Enum):
"""TODO."""

TEXT = "text"
MARKDOWN = "markdown"
84 changes: 18 additions & 66 deletions kodinerds_iptv/generate_lists.py
Original file line number Diff line number Diff line change
@@ -1,83 +1,35 @@
"""Module for generating IPTV m3u files based on YAML source files."""

import itertools
from collections import defaultdict
from collections.abc import Sequence
from pathlib import Path

from loguru import logger

from .enums import ListType
from .io_utils import read_streams
from .line_writer import AutoLineWriter
from .stream import StreamCategory

EXTM3U_HEADER = "#EXTM3U"
from .stream import Stream


def generate_sources(
sources: list[Path],
list_types: list[ListType],
def generate_lists(
streams: Sequence[Stream],
list_types: Sequence[ListType],
output_dir: Path,
output_extension: str,
logo_base_path: str | None,
logo_base_path: str,
) -> None:
"""TODO."""
source_content: dict[str, list[StreamCategory]] = {}
for source in sources:
print(f"Reading source: {source}")
source_content[source.stem] = read_streams(source)

stream_lists: dict[str, list[str]] = {}
for list_type in set(list_types):
print(f"Generating stream lines for type '{list_type.value}'")
stream_lists |= generate_stream_lines(source_content, list_type, logo_base_path)

for file_name, streams in stream_lists.items():
output_file = Path(f"{output_dir}/{file_name}.{output_extension}")

print(f"Writing file: {output_file}")
output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.write_text("\n".join((EXTM3U_HEADER, *streams, "")))

print("Finished")
line_writer = AutoLineWriter.from_list_type(
list_type, logo_base_path=logo_base_path
)

lines = itertools.chain.from_iterable(
line_writer.get_lines(stream) for stream in streams
)

def generate_stream_lines(
content: dict[str, list[StreamCategory]],
list_type: ListType,
logo_base_path: str | None,
) -> dict[str, list[str]]:
"""Generate stream lines for all categories based on type.
output_file = Path(f"{output_dir}/{list_type.value}{output_extension}")
logger.info(f"Writing streams file: {output_file}")

Given an input of streams and a list type, this function generates a
nested dictionary of lists containing final output lines.
Parameters
----------
content
A dictionary of stream groups with respective source file name.
list_type
The type of required output format.
logo_base_path
Base path for images that 'tvg_logo' will be appended to.
Returns
-------
dict[str, list[str]]
A nested dictionary of lists containing parsed lines of content.
"""
stream_lines: defaultdict[str, list[str]] = defaultdict(list)
line_writer = AutoLineWriter.from_list_type(
list_type, logo_base_path=logo_base_path
)
full_path = f"{list_type.value.lower()}/{list_type.value.lower()}"

for source_name, stream_groups in content.items():
source_path = f"{full_path}_{source_name}"

for stream_group in stream_groups:
category_path = f"{source_path}_{stream_group.name}"
all_paths = (full_path, source_path, category_path)

for stream, path in itertools.product(stream_group.streams, all_paths):
stream_lines[path].extend(line_writer.get_lines(stream))

return stream_lines
output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.write_text("\n".join(("#EXTM3U", *lines, "")))
Loading

0 comments on commit 8c3c3d2

Please sign in to comment.