diff --git a/ovos_stt_plugin_vosk/__init__.py b/ovos_stt_plugin_vosk/__init__.py index 1933d36..cb7e167 100644 --- a/ovos_stt_plugin_vosk/__init__.py +++ b/ovos_stt_plugin_vosk/__init__.py @@ -1,12 +1,19 @@ import json +import os +import shutil +import tarfile +import zipfile +from os import makedirs +from os.path import join, isdir, exists +from queue import Queue +from tempfile import mkstemp from time import sleep -from os.path import join, exists + +import requests from ovos_plugin_manager.templates.stt import STT, StreamThread, StreamingSTT -from ovos_skill_installer import download_extract_zip, download_extract_tar from ovos_utils.log import LOG from ovos_utils.network_utils import is_connected from ovos_utils.xdg_utils import xdg_data_home -from queue import Queue from speech_recognition import AudioData from vosk import Model as KaldiModel, KaldiRecognizer @@ -238,3 +245,103 @@ def create_streaming_thread(self): return VoskKaldiStreamThread( self.queue, self.lang, self.model, self.verbose ) +def download(url, file=None, session=None): + """ + Pass file as a filename, open file object, or None to return the request bytes + + Args: + url (str): URL of file to download + file (Union[str, io, None]): One of the following: + - Filename of output file + - File opened in binary write mode + - None: Return raw bytes instead + + Returns: + Union[bytes, None]: Bytes of file if file is None + """ + + if isinstance(file, str): + file = open(file, 'wb') + try: + if session: + content = session.get(url).content + else: + content = requests.get(url).content + if file: + file.write(content) + else: + return content + finally: + if file: + file.close() + + +def download_extract_tar(tar_url, folder, tar_filename='', + skill_folder_name=None, session=None): + """ + Download and extract the tar at the url to the given folder + + Args: + tar_url (str): URL of tar file to download + folder (str): Location of parent directory to extract to. Doesn't have to exist + tar_filename (str): Location to download tar. Default is to a temp file + skill_folder_name (str): rename extracted skill folder to this + """ + try: + makedirs(folder) + except OSError: + if not isdir(folder): + raise + if not tar_filename: + fd, tar_filename = mkstemp('.tar.gz') + download(tar_url, os.fdopen(fd, 'wb'), session=session) + else: + download(tar_url, tar_filename, session=session) + + with tarfile.open(tar_filename) as tar: + tar.extractall(path=folder) + + if skill_folder_name: + with tarfile.open(tar_filename) as tar: + for p in tar.getnames(): + original_folder = p.split("/")[0] + break + original_folder = join(folder, original_folder) + final_folder = join(folder, skill_folder_name) + shutil.move(original_folder, final_folder) + + +def download_extract_zip(zip_url, folder, zip_filename="", + skill_folder_name=None, session=None): + """ + Download and extract the zip at the url to the given folder + + Args: + zip_url (str): URL of zip file to download + folder (str): Location of parent directory to extract to. Doesn't have to exist + zip_filename (str): Location to download zip. Default is to a temp file + skill_folder_name (str): rename extracted skill folder to this + """ + try: + makedirs(folder) + except OSError: + if not isdir(folder): + raise + if not zip_filename: + fd, zip_filename = mkstemp('.tar.gz') + download(zip_url, os.fdopen(fd, 'wb'), session=session) + else: + download(zip_url, zip_filename, session=session) + + with zipfile.ZipFile(zip_filename, 'r') as zip_ref: + zip_ref.extractall(folder) + + if skill_folder_name: + with zipfile.ZipFile(zip_filename, 'r') as zip_ref: + for p in zip_ref.namelist(): + original_folder = p.split("/")[0] + break + + original_folder = join(folder, original_folder) + final_folder = join(folder, skill_folder_name) + shutil.move(original_folder, final_folder)