diff --git a/evadb/catalog/catalog_manager.py b/evadb/catalog/catalog_manager.py index 20c50c9dfb..62ad30a59e 100644 --- a/evadb/catalog/catalog_manager.py +++ b/evadb/catalog/catalog_manager.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime import shutil from pathlib import Path from typing import Any, List @@ -39,6 +40,8 @@ FunctionIOCatalogEntry, FunctionMetadataCatalogEntry, IndexCatalogEntry, + JobCatalogEntry, + JobHistoryCatalogEntry, TableCatalogEntry, drop_all_tables_except_catalog, init_db, @@ -61,6 +64,8 @@ FunctionMetadataCatalogService, ) from evadb.catalog.services.index_catalog_service import IndexCatalogService +from evadb.catalog.services.job_catalog_service import JobCatalogService +from evadb.catalog.services.job_history_catalog_service import JobHistoryCatalogService from evadb.catalog.services.table_catalog_service import TableCatalogService from evadb.catalog.sql_config import IDENTIFIER_COLUMN, SQLConfig from evadb.expression.function_expression import FunctionExpression @@ -85,6 +90,10 @@ def __init__(self, db_uri: str): self._config_catalog_service = ConfigurationCatalogService( self._sql_config.session ) + self._job_catalog_service = JobCatalogService(self._sql_config.session) + self._job_history_catalog_service = JobHistoryCatalogService( + self._sql_config.session + ) self._table_catalog_service = TableCatalogService(self._sql_config.session) self._column_service = ColumnCatalogService(self._sql_config.session) self._function_service = FunctionCatalogService(self._sql_config.session) @@ -215,6 +224,137 @@ def check_native_table_exists(self, table_name: str, database_name: str): return True + "Job catalog services" + + def insert_job_catalog_entry( + self, + name: str, + queries: str, + start_time: datetime, + end_time: datetime, + repeat_interval: int, + active: bool, + next_schedule_run: datetime, + ) -> JobCatalogEntry: + """A new entry is persisted in the job catalog. + + Args: + name: job name + queries: job's queries + start_time: job start time + end_time: job end time + repeat_interval: job repeat interval + active: job status + next_schedule_run: next run time as per schedule + """ + job_entry = self._job_catalog_service.insert_entry( + name, + queries, + start_time, + end_time, + repeat_interval, + active, + next_schedule_run, + ) + + return job_entry + + def get_job_catalog_entry(self, job_name: str) -> JobCatalogEntry: + """ + Returns the job catalog entry for the given database_name + Arguments: + job_name (str): name of the job + + Returns: + JobCatalogEntry + """ + + table_entry = self._job_catalog_service.get_entry_by_name(job_name) + + return table_entry + + def drop_job_catalog_entry(self, job_entry: JobCatalogEntry) -> bool: + """ + This method deletes the job from catalog. + + Arguments: + job_entry: job catalog entry to remove + + Returns: + True if successfully deleted else False + """ + return self._job_catalog_service.delete_entry(job_entry) + + def get_next_executable_job(self, only_past_jobs: bool = False) -> JobCatalogEntry: + """Get the oldest job that is ready to be triggered by trigger time + Arguments: + only_past_jobs: boolean flag to denote if only jobs with trigger time in + past should be considered + Returns: + Returns the first job to be triggered + """ + return self._job_catalog_service.get_next_executable_job(only_past_jobs) + + def update_job_catalog_entry( + self, job_name: str, next_scheduled_run: datetime, active: bool + ): + """Update the next_scheduled_run and active column as per the provided values + Arguments: + job_name (str): job which should be updated + + next_run_time (datetime): the next trigger time for the job + + active (bool): the active status for the job + """ + self._job_catalog_service.update_next_scheduled_run( + job_name, next_scheduled_run, active + ) + + "Job history catalog services" + + def insert_job_history_catalog_entry( + self, + job_id: str, + job_name: str, + execution_start_time: datetime, + execution_end_time: datetime, + ) -> JobCatalogEntry: + """A new entry is persisted in the job history catalog. + + Args: + job_id: job id for the execution entry + job_name: job name for the execution entry + execution_start_time: job execution start time + execution_end_time: job execution end time + """ + job_history_entry = self._job_history_catalog_service.insert_entry( + job_id, job_name, execution_start_time, execution_end_time + ) + + return job_history_entry + + def get_job_history_by_job_id(self, job_id: int) -> list[JobHistoryCatalogEntry]: + """Returns all the entries present for this job_id on in the history. + + Args: + job_id: the id of job whose history should be fetched + """ + return self._job_history_catalog_service.get_entry_by_job_id(job_id) + + def update_job_history_end_time( + self, job_id: int, execution_start_time: datetime, execution_end_time: datetime + ) -> list[JobHistoryCatalogEntry]: + """Updates the execution_end_time for this job history matching job_id and execution_start_time. + + Args: + job_id: id of the job whose history entry which should be updated + execution_start_time: the start time for the job history entry + execution_end_time: the end time for the job history entry + """ + return self._job_history_catalog_service.update_entry_end_time( + job_id, execution_start_time, execution_end_time + ) + "Table catalog services" def insert_table_catalog_entry( diff --git a/evadb/catalog/models/job_catalog.py b/evadb/catalog/models/job_catalog.py new file mode 100644 index 0000000000..07a66f622b --- /dev/null +++ b/evadb/catalog/models/job_catalog.py @@ -0,0 +1,92 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json + +from sqlalchemy import Boolean, Column, DateTime, Index, Integer, String +from sqlalchemy.orm import relationship + +from evadb.catalog.models.base_model import BaseModel +from evadb.catalog.models.utils import JobCatalogEntry + + +class JobCatalog(BaseModel): + """The `JobCatalog` catalog stores information about all the created Jobs. + `_row_id:` an autogenerated unique identifier. + `_name:` the job name. + `_queries:` the queries to run as part of this job + `_start_time:` the job's start time + `_end_time:` the job's end time + `_repeat_interval:` the job's repeat interval + `_repeat_period:` the job's repeat period + `_active:` is the job active/deleted + `_next_scheduled_run:` the next trigger time for the job as per the schedule + `_created_at:` entry creation time + `_updated_at:` entry last update time + """ + + __tablename__ = "job_catalog" + + _name = Column("name", String(100), unique=True) + _queries = Column("queries", String, nullable=False) + _start_time = Column("start_time", DateTime, default=datetime.datetime.now) + _end_time = Column("end_ts", DateTime) + _repeat_interval = Column("repeat_interval", Integer) + _active = Column("active", Boolean, default=True) + _next_scheduled_run = Column("next_scheduled_run", DateTime) + + _created_at = Column("created_at", DateTime, default=datetime.datetime.now) + _updated_at = Column( + "updated_at", + DateTime, + default=datetime.datetime.now, + onupdate=datetime.datetime.now, + ) + + _next_run_index = Index("_next_run_index", _next_scheduled_run) + _job_history_catalog = relationship("JobHistoryCatalog", cascade="all, delete") + + def __init__( + self, + name: str, + queries: str, + start_time: datetime, + end_time: datetime, + repeat_interval: Integer, + active: bool, + next_schedule_run: datetime, + ): + self._name = name + self._queries = queries + self._start_time = start_time + self._end_time = end_time + self._repeat_interval = repeat_interval + self._active = active + self._next_scheduled_run = next_schedule_run + + def as_dataclass(self) -> "JobCatalogEntry": + return JobCatalogEntry( + row_id=self._row_id, + name=self._name, + queries=json.loads(self._queries), + start_time=self._start_time, + end_time=self._end_time, + repeat_interval=self._repeat_interval, + active=self._active, + next_scheduled_run=self._next_scheduled_run, + created_at=self._created_at, + updated_at=self._updated_at, + ) diff --git a/evadb/catalog/models/job_history_catalog.py b/evadb/catalog/models/job_history_catalog.py new file mode 100644 index 0000000000..c6c21ba3f2 --- /dev/null +++ b/evadb/catalog/models/job_history_catalog.py @@ -0,0 +1,73 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import datetime + +from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, UniqueConstraint + +from evadb.catalog.models.base_model import BaseModel +from evadb.catalog.models.utils import JobHistoryCatalogEntry + + +class JobHistoryCatalog(BaseModel): + """The `JobHistoryCatalog` stores the execution history of jobs . + `_row_id:` an autogenerated unique identifier. + `_job_id:` job id. + `_job_name:` job name. + `_execution_start_time:` start time of this run + `_execution_end_time:` end time for this run + `_created_at:` entry creation time + `_updated_at:` entry last update time + """ + + __tablename__ = "job_history_catalog" + + _job_id = Column( + "job_id", Integer, ForeignKey("job_catalog._row_id", ondelete="CASCADE") + ) + _job_name = Column("job_name", String(100)) + _execution_start_time = Column("execution_start_time", DateTime) + _execution_end_time = Column("execution_end_time", DateTime) + _created_at = Column("created_at", DateTime, default=datetime.datetime.now) + _updated_at = Column( + "updated_at", + DateTime, + default=datetime.datetime.now, + onupdate=datetime.datetime.now, + ) + + __table_args__ = (UniqueConstraint("job_id", "execution_start_time"), {}) + + def __init__( + self, + job_id: int, + job_name: str, + execution_start_time: datetime, + execution_end_time: datetime, + ): + self._job_id = job_id + self._job_name = job_name + self._execution_start_time = execution_start_time + self._execution_end_time = execution_end_time + + def as_dataclass(self) -> "JobHistoryCatalogEntry": + return JobHistoryCatalogEntry( + row_id=self._row_id, + job_id=self._job_id, + job_name=self._job_name, + execution_start_time=self._execution_start_time, + execution_end_time=self._execution_end_time, + created_at=self._created_at, + updated_at=self._updated_at, + ) diff --git a/evadb/catalog/models/utils.py b/evadb/catalog/models/utils.py index 5da3a2eef5..2c2271f4bc 100644 --- a/evadb/catalog/models/utils.py +++ b/evadb/catalog/models/utils.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import contextlib +import datetime import json from dataclasses import dataclass, field from typing import List, Tuple @@ -275,3 +276,57 @@ def display_format(self): "key": self.key, "value": self.value, } + + +@dataclass(unsafe_hash=True) +class JobCatalogEntry: + """Dataclass representing an entry in the `JobCatalog`.""" + + name: str + queries: list + start_time: datetime + end_time: datetime + repeat_interval: int + active: bool + next_scheduled_run: datetime + created_at: datetime + updated_at: datetime + row_id: int = None + + def display_format(self): + return { + "row_id": self.row_id, + "name": self.name, + "queries": self.queries, + "start_time": self.start_time, + "end_time": self.end_time, + "repeat_interval": self.repeat_interval, + "active": self.active, + "next_schedule_run": self.next_scheduled_run, + "created_at": self.created_at, + "updated_at": self.updated_at, + } + + +@dataclass(unsafe_hash=True) +class JobHistoryCatalogEntry: + """Dataclass representing an entry in the `JobHistoryCatalog`.""" + + job_id: int + job_name: str + execution_start_time: datetime + execution_end_time: datetime + created_at: datetime + updated_at: datetime + row_id: int = None + + def display_format(self): + return { + "row_id": self.row_id, + "job_id": self.job_name, + "job_name": self.job_name, + "execution_start_time": self.execution_start_time, + "execution_end_time": self.execution_end_time, + "created_at": self.created_at, + "updated_at": self.updated_at, + } diff --git a/evadb/catalog/services/job_catalog_service.py b/evadb/catalog/services/job_catalog_service.py new file mode 100644 index 0000000000..1c8ad554b9 --- /dev/null +++ b/evadb/catalog/services/job_catalog_service.py @@ -0,0 +1,163 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json + +from sqlalchemy import and_, true +from sqlalchemy.orm import Session +from sqlalchemy.sql.expression import select + +from evadb.catalog.models.job_catalog import JobCatalog +from evadb.catalog.models.utils import JobCatalogEntry +from evadb.catalog.services.base_service import BaseService +from evadb.utils.errors import CatalogError +from evadb.utils.logging_manager import logger + + +class JobCatalogService(BaseService): + def __init__(self, db_session: Session): + super().__init__(JobCatalog, db_session) + + def insert_entry( + self, + name: str, + queries: list, + start_time: datetime, + end_time: datetime, + repeat_interval: int, + active: bool, + next_schedule_run: datetime, + ) -> JobCatalogEntry: + try: + job_catalog_obj = self.model( + name=name, + queries=json.dumps(queries), + start_time=start_time, + end_time=end_time, + repeat_interval=repeat_interval, + active=active, + next_schedule_run=next_schedule_run, + ) + job_catalog_obj = job_catalog_obj.save(self.session) + + except Exception as e: + logger.exception( + f"Failed to insert entry into job catalog with exception {str(e)}" + ) + raise CatalogError(e) + + return job_catalog_obj.as_dataclass() + + def get_entry_by_name(self, job_name: str) -> JobCatalogEntry: + """ + Get the job catalog entry with given job name. + Arguments: + job_name (str): Job name + Returns: + JobCatalogEntry - catalog entry for given job name + """ + entry = self.session.execute( + select(self.model).filter(self.model._name == job_name) + ).scalar_one_or_none() + if entry: + return entry.as_dataclass() + return entry + + def delete_entry(self, job_entry: JobCatalogEntry): + """Delete Job from the catalog + Arguments: + job (JobCatalogEntry): job to delete + Returns: + True if successfully removed else false + """ + try: + job_catalog_obj = self.session.execute( + select(self.model).filter(self.model._row_id == job_entry.row_id) + ).scalar_one_or_none() + job_catalog_obj.delete(self.session) + return True + except Exception as e: + err_msg = f"Delete Job failed for {job_entry} with error {str(e)}." + logger.exception(err_msg) + raise CatalogError(err_msg) + + def get_all_overdue_jobs(self) -> list: + """Get the list of jobs that are overdue to be triggered + Arguments: + None + Returns: + Returns the list of all active overdue jobs + """ + entries = ( + self.session.execute( + select(self.model).filter( + and_( + self.model._next_scheduled_run <= datetime.datetime.now(), + self.model._active == true(), + ) + ) + ) + .scalars() + .all() + ) + entries = [row.as_dataclass() for row in entries] + return entries + + def get_next_executable_job(self, only_past_jobs: bool) -> JobCatalogEntry: + """Get the oldest job that is ready to be triggered by trigger time + Arguments: + only_past_jobs (bool): boolean flag to denote if only jobs with trigger time in + past should be considered + Returns: + Returns the first job to be triggered + """ + entry = self.session.execute( + select(self.model) + .filter( + and_( + self.model._next_scheduled_run <= datetime.datetime.now(), + self.model._active == true(), + ) + if only_past_jobs + else self.model._active == true() + ) + .order_by(self.model._next_scheduled_run.asc()) + .limit(1) + ).scalar_one_or_none() + if entry: + return entry.as_dataclass() + return entry + + def update_next_scheduled_run( + self, job_name: str, next_scheduled_run: datetime, active: bool + ): + """Update the next_scheduled_run and active column as per the provided values + Arguments: + job_name (str): job which should be updated + + next_run_time (datetime): the next trigger time for the job + + active (bool): the active status for the job + Returns: + void + """ + job = ( + self.session.query(self.model).filter(self.model._name == job_name).first() + ) + if job: + job._next_scheduled_run = next_scheduled_run + job._active = active + self.session.commit() diff --git a/evadb/catalog/services/job_history_catalog_service.py b/evadb/catalog/services/job_history_catalog_service.py new file mode 100644 index 0000000000..bd1b27c8b9 --- /dev/null +++ b/evadb/catalog/services/job_history_catalog_service.py @@ -0,0 +1,100 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +from sqlalchemy import and_ +from sqlalchemy.orm import Session +from sqlalchemy.sql.expression import select + +from evadb.catalog.models.job_history_catalog import JobHistoryCatalog +from evadb.catalog.models.utils import JobHistoryCatalogEntry +from evadb.catalog.services.base_service import BaseService +from evadb.utils.errors import CatalogError +from evadb.utils.logging_manager import logger + + +class JobHistoryCatalogService(BaseService): + def __init__(self, db_session: Session): + super().__init__(JobHistoryCatalog, db_session) + + def insert_entry( + self, + job_id: str, + job_name: str, + execution_start_time: datetime, + execution_end_time: datetime, + ) -> JobHistoryCatalogEntry: + try: + job_history_catalog_obj = self.model( + job_id=job_id, + job_name=job_name, + execution_start_time=execution_start_time, + execution_end_time=execution_end_time, + ) + job_history_catalog_obj = job_history_catalog_obj.save(self.session) + + except Exception as e: + logger.exception( + f"Failed to insert entry into job history catalog with exception {str(e)}" + ) + raise CatalogError(e) + + return job_history_catalog_obj.as_dataclass() + + def get_entry_by_job_id(self, job_id: int) -> list[JobHistoryCatalogEntry]: + """ + Get all the job history catalog entry with given job id. + Arguments: + job_id (int): Job id + Returns: + list[JobHistoryCatalogEntry]: all history catalog entries for given job id + """ + entries = ( + self.session.execute( + select(self.model).filter(self.model._job_id == job_id) + ) + .scalars() + .all() + ) + entries = [row.as_dataclass() for row in entries] + return entries + + def update_entry_end_time( + self, job_id: int, execution_start_time: datetime, execution_end_time: datetime + ): + """Update the execution_end_time of the entry as per the provided values + Arguments: + job_id (int): id of the job whose history entry which should be updated + + execution_start_time (datetime): the start time for the job history entry + + execution_end_time (datetime): the end time for the job history entry + Returns: + void + """ + job_history_entry = ( + self.session.query(self.model) + .filter( + and_( + self.model._job_id == job_id, + self.model._execution_start_time == execution_start_time, + ) + ) + .first() + ) + if job_history_entry: + job_history_entry._execution_end_time = execution_end_time + self.session.commit() diff --git a/evadb/catalog/sql_config.py b/evadb/catalog/sql_config.py index 0a460a899d..fed6630f3e 100644 --- a/evadb/catalog/sql_config.py +++ b/evadb/catalog/sql_config.py @@ -38,6 +38,8 @@ "functionio_catalog", "function_cost_catalog", "function_metadata_catalog", + "job_catalog", + "job_history_catalog", ] # Add all keywords that are restricted by EvaDB diff --git a/evadb/executor/create_job_executor.py b/evadb/executor/create_job_executor.py new file mode 100644 index 0000000000..1a614635fe --- /dev/null +++ b/evadb/executor/create_job_executor.py @@ -0,0 +1,139 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import re +from datetime import datetime + +import pandas as pd + +from evadb.database import EvaDBDatabase +from evadb.executor.abstract_executor import AbstractExecutor +from evadb.executor.executor_utils import ExecutorError +from evadb.models.storage.batch import Batch +from evadb.parser.create_statement import CreateJobStatement +from evadb.parser.parser import Parser +from evadb.utils.logging_manager import logger + + +class CreateJobExecutor(AbstractExecutor): + def __init__(self, db: EvaDBDatabase, node: CreateJobStatement): + super().__init__(db, node) + + def _parse_datetime_str(self, datetime_str: str) -> datetime: + datetime_format = "%Y-%m-%d %H:%M:%S" + date_format = "%Y-%m-%d" + + if re.match(r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", datetime_str): + try: + return datetime.strptime(datetime_str, datetime_format) + except ValueError: + raise ExecutorError( + f"{datetime_str} is not in the correct datetime format. expected format: {datetime_format}." + ) + elif re.match(r"\d{4}-\d{2}-\d{2}", datetime_str): + try: + return datetime.strptime(datetime_str, date_format) + except ValueError: + raise ExecutorError( + f"{datetime_str} is not in the correct date format. expected format: {date_format}." + ) + else: + raise ValueError( + f"{datetime_str} does not match the expected date or datetime format" + ) + + def _get_repeat_time_interval_seconds( + self, repeat_interval: int, repeat_period: str + ) -> int: + unit_to_seconds = { + "seconds": 1, + "minute": 60, + "minutes": 60, + "min": 60, + "hour": 3600, + "hours": 3600, + "day": 86400, + "days": 86400, + "week": 604800, + "weeks": 604800, + "month": 2592000, + "months": 2592000, + } + assert (repeat_period is None) or ( + repeat_period in unit_to_seconds + ), "repeat period should be one of these values: seconds | minute | minutes | min | hour | hours | day | days | week | weeks | month | months" + + repeat_interval = 1 if repeat_interval is None else repeat_interval + return repeat_interval * unit_to_seconds.get(repeat_period, 0) + + def exec(self, *args, **kwargs): + # Check if the job already exists. + job_catalog_entry = self.catalog().get_job_catalog_entry(self.node.job_name) + + if job_catalog_entry is not None: + if self.node.if_not_exists: + msg = f"A job with name {self.node.job_name} already exists, nothing added." + yield Batch(pd.DataFrame([msg])) + return + else: + raise ExecutorError( + f"A job with name {self.node.job_name} already exists." + ) + + logger.debug(f"Creating job {self.node}") + + job_name = self.node.job_name + queries = [] + parser = Parser() + + for q in self.node.queries: + try: + curr_query = str(q) + parser.parse(curr_query) + queries.append(curr_query) + except Exception: + error_msg = f"Failed to parse the job query: {curr_query}" + logger.exception(error_msg) + raise ExecutorError(error_msg) + start_time = ( + self._parse_datetime_str(self.node.start_time) + if self.node.start_time is not None + else datetime.datetime.now() + ) + end_time = ( + self._parse_datetime_str(self.node.end_time) + if self.node.end_time is not None + else None + ) + repeat_interval = self._get_repeat_time_interval_seconds( + self.node.repeat_interval, self.node.repeat_period + ) + active = True + next_schedule_run = start_time + + self.catalog().insert_job_catalog_entry( + job_name, + queries, + start_time, + end_time, + repeat_interval, + active, + next_schedule_run, + ) + + yield Batch( + pd.DataFrame( + [f"The job {self.node.job_name} has been successfully created."] + ) + ) diff --git a/evadb/executor/drop_object_executor.py b/evadb/executor/drop_object_executor.py index a857f15eae..c4f108052e 100644 --- a/evadb/executor/drop_object_executor.py +++ b/evadb/executor/drop_object_executor.py @@ -46,6 +46,9 @@ def exec(self, *args, **kwargs): elif self.node.object_type == ObjectType.DATABASE: yield self._handle_drop_database(self.node.name, self.node.if_exists) + elif self.node.object_type == ObjectType.JOB: + yield self._handle_drop_job(self.node.name, self.node.if_exists) + def _handle_drop_table(self, table_name: str, if_exists: bool): if not self.catalog().check_table_exists(table_name): err_msg = "Table: {} does not exist".format(table_name) @@ -160,3 +163,24 @@ def _handle_drop_database(self, database_name: str, if_exists: bool): index=[0], ) ) + + def _handle_drop_job(self, job_name: str, if_exists: bool): + job_catalog_entry = self.catalog().get_job_catalog_entry(job_name) + if not job_catalog_entry: + err_msg = f"Job {job_name} does not exist, therefore cannot be dropped." + if if_exists: + logger.warning(err_msg) + return Batch(pd.DataFrame([err_msg])) + else: + raise RuntimeError(err_msg) + + logger.debug(f"Dropping Job {job_name}") + + self.catalog().drop_job_catalog_entry(job_catalog_entry) + + return Batch( + pd.DataFrame( + {f"Job {job_name} successfully dropped"}, + index=[0], + ) + ) diff --git a/evadb/executor/plan_executor.py b/evadb/executor/plan_executor.py index 94d290bdb3..a2921f3cbb 100644 --- a/evadb/executor/plan_executor.py +++ b/evadb/executor/plan_executor.py @@ -21,6 +21,7 @@ from evadb.executor.create_executor import CreateExecutor from evadb.executor.create_function_executor import CreateFunctionExecutor from evadb.executor.create_index_executor import CreateIndexExecutor +from evadb.executor.create_job_executor import CreateJobExecutor from evadb.executor.delete_executor import DeleteExecutor from evadb.executor.drop_object_executor import DropObjectExecutor from evadb.executor.exchange_executor import ExchangeExecutor @@ -48,7 +49,7 @@ from evadb.executor.use_executor import UseExecutor from evadb.executor.vector_index_scan_executor import VectorIndexScanExecutor from evadb.models.storage.batch import Batch -from evadb.parser.create_statement import CreateDatabaseStatement +from evadb.parser.create_statement import CreateDatabaseStatement, CreateJobStatement from evadb.parser.set_statement import SetStatement from evadb.parser.statement import AbstractStatement from evadb.parser.use_statement import UseStatement @@ -93,6 +94,8 @@ def _build_execution_tree( return UseExecutor(db=self._db, node=plan) elif isinstance(plan, SetStatement): return SetExecutor(db=self._db, node=plan) + elif isinstance(plan, CreateJobStatement): + return CreateJobExecutor(db=self._db, node=plan) # Get plan node type plan_opr_type = plan.opr_type diff --git a/evadb/interfaces/relational/db.py b/evadb/interfaces/relational/db.py index a8d66a22bf..428d0878f5 100644 --- a/evadb/interfaces/relational/db.py +++ b/evadb/interfaces/relational/db.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio +import multiprocessing import pandas @@ -43,6 +44,7 @@ ) from evadb.server.command_handler import execute_statement from evadb.utils.generic_utils import find_nearest_word, is_ray_enabled_and_installed +from evadb.utils.job_scheduler import JobScheduler from evadb.utils.logging_manager import logger @@ -53,6 +55,7 @@ def __init__(self, evadb: EvaDBDatabase, reader, writer): self._cursor = None self._result: Batch = None self._evadb = evadb + self._jobs_process = None def cursor(self): """Retrieves a cursor associated with the connection. @@ -80,6 +83,23 @@ def cursor(self): self._cursor = EvaDBCursor(self) return self._cursor + def start_jobs(self): + if self._jobs_process and self._jobs_process.is_alive(): + logger.debug("The job scheduler is already running") + return + + job_scheduler = JobScheduler(self._evadb) + self._jobs_process = multiprocessing.Process(target=job_scheduler.execute) + self._jobs_process.daemon = True + self._jobs_process.start() + logger.debug("Job scheduler process started") + + def stop_jobs(self): + if self._jobs_process is not None and self._jobs_process.is_alive(): + self._jobs_process.terminate() + self._jobs_process.join() + logger.debug("Job scheduler process stopped") + class EvaDBCursor(object): def __init__(self, connection): diff --git a/evadb/parser/create_statement.py b/evadb/parser/create_statement.py index ca79a9eac4..89aee64cfa 100644 --- a/evadb/parser/create_statement.py +++ b/evadb/parser/create_statement.py @@ -12,7 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Tuple +from dataclasses import dataclass +from typing import List, Optional, Tuple from evadb.catalog.catalog_type import ColumnType, NdArrayType from evadb.parser.select_statement import SelectStatement @@ -226,3 +227,44 @@ def __str__(self) -> str: f"WITH ENGINE '{self.engine}' , \n" f"PARAMETERS = {self.param_dict};" ) + + +@dataclass +class CreateJobStatement(AbstractStatement): + job_name: str + queries: list + if_not_exists: bool + start_time: Optional[str] = None + end_time: Optional[str] = None + repeat_interval: Optional[int] = None + repeat_period: Optional[str] = None + + def __hash__(self): + return hash( + ( + super().__hash__(), + self.job_name, + tuple(self.queries), + self.start_time, + self.end_time, + self.repeat_interval, + self.repeat_period, + ) + ) + + def __post_init__(self): + super().__init__(StatementType.CREATE_JOB) + + def __str__(self): + start_str = f"\nSTART {self.start_time}" if self.start_time is not None else "" + end_str = f"\nEND {self.end_time}" if self.end_time is not None else "" + repeat_str = ( + f"\nEVERY {self.repeat_interval} {self.repeat_period}" + if self.repeat_interval is not None + else "" + ) + return ( + f"CREATE JOB {self.job_name} AS\n" + f"({(str(q) for q in self.queries)})" + f"{start_str} {end_str} {repeat_str}" + ) diff --git a/evadb/parser/evadb.lark b/evadb/parser/evadb.lark index ab4cdb6772..86798df6c0 100644 --- a/evadb/parser/evadb.lark +++ b/evadb/parser/evadb.lark @@ -1,11 +1,15 @@ // Top Level Description -start: (sql_statement? ";")+ +// create_job is intentionally not treated as an sql_statement to keep the parser clean +// because we assume that inside the job, the user can specify multiple sql_statements +// but not a create_job within a create_job. + +start: (sql_statement? ";")+ | (create_job ";") sql_statement: ddl_statement | dml_statement | utility_statement | context_statement - -ddl_statement: create_database | create_table | create_index | create_function - | drop_database | drop_table | drop_function | drop_index | rename_table + +ddl_statement: create_database | create_table | create_index | create_function | drop_database + | drop_table | drop_function | drop_index | drop_job | rename_table dml_statement: select_statement | insert_statement | update_statement | delete_statement | load_statement | set_statement @@ -14,6 +18,9 @@ utility_statement: describe_statement | show_statement | help_statement | explai context_statement: use_statement +job_sql_statements: query_string (";" query_string)* ";"? + + // Data Definition Language // Create statements @@ -29,7 +36,18 @@ create_database_engine_clause: WITH ENGINE "=" string_literal "," PARAMETERS "=" create_index: CREATE INDEX if_not_exists? uid ON table_name index_elem vector_store_type? create_table: CREATE TABLE if_not_exists? table_name (create_definitions | (AS select_statement)) - + +create_job: CREATE JOB if_not_exists? uid AS "{" job_sql_statements "}" (start_time)? (end_time)? (repeat_clause)? + +start_time: START string_literal + +end_time: END string_literal + +repeat_clause: EVERY decimal_literal simple_id + + + + // Rename statements rename_table: RENAME TABLE table_name TO table_name @@ -78,6 +96,8 @@ drop_index: DROP INDEX if_exists? uid drop_table: DROP TABLE if_exists? uid drop_function: DROP FUNCTION if_exists? uid + +drop_job: DROP JOB if_exists? uid // SET statements (configuration) set_statement: SET config_name (EQUAL_SYMBOL | TO) config_value @@ -348,7 +368,9 @@ DESC: "DESC"i DESCRIBE: "DESCRIBE"i DISTINCT: "DISTINCT"i DROP: "DROP"i +END: "END"i ENGINE: "ENGINE"i +EVERY: "EVERY"i EXIT: "EXIT"i EXISTS: "EXISTS"i EXPLAIN: "EXPLAIN"i @@ -364,6 +386,7 @@ INTO: "INTO"i INDEX: "INDEX"i INSERT: "INSERT"i IS: "IS"i +JOB: "JOB"i JOIN: "JOIN"i KEY: "KEY"i LATERAL: "LATERAL"i @@ -392,6 +415,7 @@ SET: "SET"i SHUTDOWN: "SHUTDOWN"i SHOW: "SHOW"i SOME: "SOME"i +START: "START"i TABLE: "TABLE"i TABLES: "TABLES"i TO: "TO"i @@ -567,7 +591,6 @@ REAL_LITERAL: (DEC_DIGIT+)? "." DEC_DIGIT+ DOT_ID: "." ID_LITERAL - // Identifiers ID: ID_LITERAL diff --git a/evadb/parser/lark_visitor/__init__.py b/evadb/parser/lark_visitor/__init__.py index 9ed0a1b6fe..911e886a2d 100644 --- a/evadb/parser/lark_visitor/__init__.py +++ b/evadb/parser/lark_visitor/__init__.py @@ -20,6 +20,7 @@ from evadb.parser.lark_visitor._create_statements import ( CreateDatabase, CreateIndex, + CreateJob, CreateTable, ) from evadb.parser.lark_visitor._delete_statement import Delete @@ -66,6 +67,7 @@ class LarkInterpreter( CreateTable, CreateIndex, CreateDatabase, + CreateJob, Expressions, Functions, Insert, @@ -89,3 +91,11 @@ def start(self, tree): def sql_statement(self, tree): return self.visit(tree.children[0]) + + def job_sql_statements(self, tree): + sql_statements = [] + for child in tree.children: + if isinstance(child, Tree): + if child.data == "query_string": + sql_statements.append(self.visit(child)) + return sql_statements diff --git a/evadb/parser/lark_visitor/_create_statements.py b/evadb/parser/lark_visitor/_create_statements.py index a24f1eafc9..72066b294c 100644 --- a/evadb/parser/lark_visitor/_create_statements.py +++ b/evadb/parser/lark_visitor/_create_statements.py @@ -22,6 +22,7 @@ ColConstraintInfo, ColumnDefinition, CreateDatabaseStatement, + CreateJobStatement, CreateTableStatement, ) from evadb.parser.table_ref import TableRef @@ -336,3 +337,49 @@ def create_database_engine_clause(self, tree): param_dict = self.visit(child) return engine, param_dict + + +class CreateJob: + def create_job(self, tree): + job_name = None + queries = [] + start_time = None + end_time = None + repeat_interval = None + repeat_period = None + if_not_exists = False + for child in tree.children: + if isinstance(child, Tree): + if child.data == "if_not_exists": + if_not_exists = True + if child.data == "uid": + job_name = self.visit(child) + if child.data == "job_sql_statements": + queries = self.visit(child) + elif child.data == "start_time": + start_time = self.visit(child) + elif child.data == "end_time": + end_time = self.visit(child) + elif child.data == "repeat_clause": + repeat_interval, repeat_period = self.visit(child) + + create_job = CreateJobStatement( + job_name, + queries, + if_not_exists, + start_time, + end_time, + repeat_interval, + repeat_period, + ) + + return create_job + + def start_time(self, tree): + return self.visit(tree.children[1]).value + + def end_time(self, tree): + return self.visit(tree.children[1]).value + + def repeat_clause(self, tree): + return self.visit(tree.children[1]), self.visit(tree.children[2]) diff --git a/evadb/parser/lark_visitor/_drop_statement.py b/evadb/parser/lark_visitor/_drop_statement.py index 0b397378ae..7fc96298ed 100644 --- a/evadb/parser/lark_visitor/_drop_statement.py +++ b/evadb/parser/lark_visitor/_drop_statement.py @@ -73,3 +73,17 @@ def drop_database(self, tree): database_name = self.visit(child) return DropObjectStatement(ObjectType.DATABASE, database_name, if_exists) + + # Drop Job + def drop_job(self, tree): + job_name = None + if_exists = False + + for child in tree.children: + if isinstance(child, Tree): + if child.data == "if_exists": + if_exists = True + elif child.data == "uid": + job_name = self.visit(child) + + return DropObjectStatement(ObjectType.JOB, job_name, if_exists) diff --git a/evadb/parser/select_statement.py b/evadb/parser/select_statement.py index b04c7148bb..69270a6b84 100644 --- a/evadb/parser/select_statement.py +++ b/evadb/parser/select_statement.py @@ -141,7 +141,11 @@ def __str__(self) -> str: orderby_list_str += str(expr[0]) + " " + sort_str + ", " orderby_list_str = orderby_list_str.rstrip(", ") - select_str = f"SELECT {target_list_str} FROM {str(self._from_table)}" + select_str = f"SELECT {target_list_str}" + + if self._from_table is not None: + select_str += " FROM " + str(self._from_table) + if self._where_clause is not None: select_str += " WHERE " + str(self._where_clause) diff --git a/evadb/parser/types.py b/evadb/parser/types.py index 751d2b5f31..227a768c7b 100644 --- a/evadb/parser/types.py +++ b/evadb/parser/types.py @@ -42,6 +42,7 @@ class StatementType(EvaDBEnum): CREATE_DATABASE # noqa: F821 USE # noqa: F821 SET # noqa: F821 + CREATE_JOB # noqa: F821 # add other types @@ -83,3 +84,4 @@ class ObjectType(EvaDBEnum): FUNCTION # noqa: F821 INDEX # noqa: F821 DATABASE # noqa: F821 + JOB # noqa: F821 diff --git a/evadb/parser/utils.py b/evadb/parser/utils.py index a2be06ec16..dd4567cf4b 100644 --- a/evadb/parser/utils.py +++ b/evadb/parser/utils.py @@ -13,7 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. from evadb.parser.create_function_statement import CreateFunctionStatement -from evadb.parser.create_statement import CreateDatabaseStatement, CreateTableStatement +from evadb.parser.create_statement import ( + CreateDatabaseStatement, + CreateJobStatement, + CreateTableStatement, +) from evadb.parser.drop_object_statement import DropObjectStatement from evadb.parser.explain_statement import ExplainStatement from evadb.parser.insert_statement import InsertTableStatement @@ -30,6 +34,7 @@ # directly to the executor. SKIP_BINDER_AND_OPTIMIZER_STATEMENTS = ( CreateDatabaseStatement, + CreateJobStatement, UseStatement, SetStatement, ) diff --git a/evadb/utils/job_scheduler.py b/evadb/utils/job_scheduler.py new file mode 100644 index 0000000000..74ee99b7fc --- /dev/null +++ b/evadb/utils/job_scheduler.py @@ -0,0 +1,120 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import sys +import time + +from evadb.catalog.models.utils import JobCatalogEntry +from evadb.database import EvaDBDatabase +from evadb.server.command_handler import execute_query +from evadb.utils.logging_manager import logger + + +class JobScheduler: + def __init__(self, evadb: EvaDBDatabase) -> None: + self.poll_interval_seconds = 30 + self._evadb = evadb + + def _update_next_schedule_run(self, job_catalog_entry: JobCatalogEntry) -> bool: + job_end_time = job_catalog_entry.end_time + active_status = False + if job_catalog_entry.repeat_interval and job_catalog_entry.repeat_interval > 0: + next_trigger_time = datetime.datetime.now() + datetime.timedelta( + seconds=job_catalog_entry.repeat_interval + ) + if not job_end_time or next_trigger_time < job_end_time: + active_status = True + + next_trigger_time = ( + next_trigger_time if active_status else job_catalog_entry.next_scheduled_run + ) + self._evadb.catalog().update_job_catalog_entry( + job_catalog_entry.name, + next_trigger_time, + active_status, + ) + return active_status, next_trigger_time + + def _get_sleep_time(self, next_job_entry: JobCatalogEntry) -> int: + sleep_time = self.poll_interval_seconds + if next_job_entry: + sleep_time = min( + sleep_time, + ( + next_job_entry.next_scheduled_run - datetime.datetime.now() + ).total_seconds(), + ) + sleep_time = max(0, sleep_time) + return sleep_time + + def _scan_and_execute_jobs(self): + while True: + try: + for next_executable_job in iter( + lambda: self._evadb.catalog().get_next_executable_job( + only_past_jobs=True + ), + None, + ): + execution_time = datetime.datetime.now() + + # insert a job history record to mark start of this execution + self._evadb.catalog().insert_job_history_catalog_entry( + next_executable_job.row_id, + next_executable_job.name, + execution_time, + None, + ) + + # execute the queries of the job + execution_results = [ + execute_query(self._evadb, query) + for query in next_executable_job.queries + ] + logger.debug( + f"Exection result for job: {next_executable_job.name} results: {execution_results}" + ) + + # update the next trigger time for this job + self._update_next_schedule_run(next_executable_job) + + # update the previosly inserted job history record with endtime + self._evadb.catalog().update_job_history_end_time( + next_executable_job.row_id, + execution_time, + datetime.datetime.now(), + ) + + next_executable_job = self._evadb.catalog().get_next_executable_job( + only_past_jobs=False + ) + + sleep_time = self._get_sleep_time(next_executable_job) + if sleep_time > 0: + logger.debug( + f"Job scheduler process sleeping for {sleep_time} seconds" + ) + time.sleep(sleep_time) + except Exception as e: + logger.error(f"Got an exception in job scheduler: {str(e)}") + time.sleep(self.poll_interval_seconds * 0.2) + + def execute(self): + try: + self._scan_and_execute_jobs() + except KeyboardInterrupt: + logger.debug("Exiting the job scheduler process due to interrupt") + sys.exit() diff --git a/test/integration_tests/long/test_job_scheduler_execution.py b/test/integration_tests/long/test_job_scheduler_execution.py new file mode 100644 index 0000000000..e00b3ce537 --- /dev/null +++ b/test/integration_tests/long/test_job_scheduler_execution.py @@ -0,0 +1,94 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time +import unittest +from datetime import datetime, timedelta +from test.util import get_evadb_for_testing, shutdown_ray + +from mock import MagicMock + +from evadb.interfaces.relational.db import EvaDBConnection +from evadb.server.command_handler import execute_query_fetch_all + + +class JobSchedulerIntegrationTests(unittest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + @classmethod + def setUpClass(cls): + cls.evadb = get_evadb_for_testing() + # reset the catalog manager before running each test + cls.evadb.catalog().reset() + cls.job_name_1 = "test_async_job_1" + cls.job_name_2 = "test_async_job_2" + + def setUp(self): + execute_query_fetch_all(self.evadb, f"DROP JOB IF EXISTS {self.job_name_1};") + execute_query_fetch_all(self.evadb, f"DROP JOB IF EXISTS {self.job_name_2};") + + @classmethod + def tearDownClass(cls): + shutdown_ray() + execute_query_fetch_all(cls.evadb, f"DROP JOB IF EXISTS {cls.job_name_1};") + execute_query_fetch_all(cls.evadb, f"DROP JOB IF EXISTS {cls.job_name_2};") + + def create_jobs(self): + datetime_format = "%Y-%m-%d %H:%M:%S" + start_time = (datetime.now() - timedelta(seconds=10)).strftime(datetime_format) + end_time = (datetime.now() + timedelta(seconds=60)).strftime(datetime_format) + + create_csv_query = """CREATE TABLE IF NOT EXISTS MyCSV ( + id INTEGER UNIQUE, + frame_id INTEGER, + video_id INTEGER + ); + """ + job_1_query = f"""CREATE JOB IF NOT EXISTS {self.job_name_1} AS {{ + SELECT * FROM MyCSV; + }} + START '{start_time}' + END '{end_time}' + EVERY 4 seconds; + """ + job_2_query = f"""CREATE JOB IF NOT EXISTS {self.job_name_2} AS {{ + SHOW FUNCTIONS; + }} + START '{start_time}' + END '{end_time}' + EVERY 2 seconds; + """ + + execute_query_fetch_all(self.evadb, create_csv_query) + execute_query_fetch_all(self.evadb, job_1_query) + execute_query_fetch_all(self.evadb, job_2_query) + + def test_should_execute_the_scheduled_jobs(self): + self.create_jobs() + connection = EvaDBConnection(self.evadb, MagicMock(), MagicMock()) + + # start the job scheduler + connection.start_jobs() + + # let the job scheduler run for 10 seconds + time.sleep(15) + connection.stop_jobs() + + job_1_execution_count = len(self.evadb.catalog().get_job_history_by_job_id(1)) + job_2_execution_count = len(self.evadb.catalog().get_job_history_by_job_id(2)) + + self.assertGreater(job_2_execution_count, job_1_execution_count) + self.assertGreater(job_2_execution_count, 2) + self.assertGreater(job_1_execution_count, 2) diff --git a/test/integration_tests/short/test_create_job_executor.py b/test/integration_tests/short/test_create_job_executor.py new file mode 100644 index 0000000000..9a9ce18c1b --- /dev/null +++ b/test/integration_tests/short/test_create_job_executor.py @@ -0,0 +1,126 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +from datetime import datetime +from test.util import get_evadb_for_testing, shutdown_ray + +from evadb.executor.executor_utils import ExecutorError +from evadb.server.command_handler import execute_query_fetch_all + + +class CreateJobTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.evadb = get_evadb_for_testing() + # reset the catalog manager before running each test + cls.evadb.catalog().reset() + cls.job_name = "test_async_job" + + def setUp(self): + execute_query_fetch_all(self.evadb, f"DROP JOB IF EXISTS {self.job_name};") + + @classmethod + def tearDownClass(cls): + shutdown_ray() + execute_query_fetch_all(cls.evadb, f"DROP JOB IF EXISTS {cls.job_name};") + + def test_invalid_query_in_job_should_raise_exception(self): + # missing closing paranthesis in the query + query = f"""CREATE JOB {self.job_name} AS {{ + CREATE OR REPLACE FUNCTION HomeSalesForecast FROM + ( SELECT * FROM postgres_data.home_sales + TYPE Forecasting + PREDICT 'price'; + }} + START '2023-04-01 01:10:00' + END '2023-05-01' + EVERY 2 week; + """ + with self.assertRaisesRegex(Exception, "Failed to parse the job query"): + execute_query_fetch_all(self.evadb, query) + + def test_create_job_should_add_the_entry(self): + queries = [ + """CREATE OR REPLACE FUNCTION HomeSalesForecast FROM + ( SELECT * FROM postgres_data.home_sales ) + TYPE Forecasting + PREDICT 'price';""", + "Select HomeSalesForecast(10);", + ] + start = "2023-04-01 01:10:00" + end = "2023-05-01" + repeat_interval = 2 + repeat_period = "week" + + all_queries = "".join(queries) + query = f"""CREATE JOB {self.job_name} AS {{ + {all_queries} + }} + START '{start}' + END '{end}' + EVERY {repeat_interval} {repeat_period};""" + + execute_query_fetch_all(self.evadb, query) + + datetime_format = "%Y-%m-%d %H:%M:%S" + date_format = "%Y-%m-%d" + job_entry = self.evadb.catalog().get_job_catalog_entry(self.job_name) + self.assertEqual(job_entry.name, self.job_name) + self.assertEqual( + job_entry.start_time, datetime.strptime(start, datetime_format) + ) + self.assertEqual(job_entry.end_time, datetime.strptime(end, date_format)) + self.assertEqual(job_entry.repeat_interval, 2 * 7 * 24 * 60 * 60) + self.assertEqual(job_entry.active, True) + self.assertEqual(len(job_entry.queries), len(queries)) + + def test_should_create_job_with_if_not_exists(self): + if_not_exists = "IF NOT EXISTS" + + queries = [ + """CREATE OR REPLACE FUNCTION HomeSalesForecast FROM + ( SELECT * FROM postgres_data.home_sales ) + TYPE Forecasting + PREDICT 'price';""", + "Select HomeSalesForecast(10);", + ] + + query = """CREATE JOB {} {} AS {{ + {} + }} + START '2023-04-01' + END '2023-05-01' + EVERY 2 week; + """ + + # Create the database. + execute_query_fetch_all( + self.evadb, query.format(if_not_exists, self.job_name, "".join(queries)) + ) + + # Trying to create the same database should raise an exception. + with self.assertRaises(ExecutorError): + execute_query_fetch_all( + self.evadb, query.format("", self.job_name, "".join(queries)) + ) + + # Trying to create the same database should warn if "IF NOT EXISTS" is provided. + execute_query_fetch_all( + self.evadb, query.format(if_not_exists, self.job_name, "".join(queries)) + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/integration_tests/short/test_drop_executor.py b/test/integration_tests/short/test_drop_executor.py index 632aa4a008..53dc1211eb 100644 --- a/test/integration_tests/short/test_drop_executor.py +++ b/test/integration_tests/short/test_drop_executor.py @@ -245,3 +245,39 @@ def test_should_drop_database(self): result = execute_query_fetch_all( self.evadb, f"DROP DATABASE IF EXISTS {database_name}" ) + + def test_should_drop_job(self): + # Create database. + job_name = "test_async_job" + + query = f"""CREATE JOB {job_name} AS {{ + SELECT * from job_catalog; + }} + START '2023-04-01' + END '2023-05-01' + EVERY 2 week;""" + + execute_query_fetch_all(self.evadb, query) + self.assertIsNotNone(self.evadb.catalog().get_job_catalog_entry(job_name)) + + # DROP JOB + execute_query_fetch_all(self.evadb, f"DROP JOB {job_name}") + self.assertIsNone(self.evadb.catalog().get_job_catalog_entry(job_name)) + + # DROP should pass with warning + result = execute_query_fetch_all(self.evadb, f"DROP JOB IF EXISTS {job_name}") + self.assertTrue("does not exist" in result.frames.to_string()) + + # DROP should throw error + with self.assertRaises(ExecutorError): + execute_query_fetch_all( + self.evadb, + f"DROP JOB {job_name}", + do_not_print_exceptions=True, + ) + + # We should be able to add the database again + execute_query_fetch_all(self.evadb, query) + + # clean up + result = execute_query_fetch_all(self.evadb, f"DROP JOB IF EXISTS {job_name}") diff --git a/test/unit_tests/parser/test_parser.py b/test/unit_tests/parser/test_parser.py index 6241323d44..6086db088e 100644 --- a/test/unit_tests/parser/test_parser.py +++ b/test/unit_tests/parser/test_parser.py @@ -1243,3 +1243,30 @@ def test_class_equality(self): self.assertNotEqual(tuple_frame, table_ref) self.assertNotEqual(join_node, table_ref) self.assertNotEqual(table_ref, table_info) + + def test_create_job(self): + queries = [ + """CREATE OR REPLACE FUNCTION HomeSalesForecast FROM + ( SELECT * FROM postgres_data.home_sales ) + TYPE Forecasting + PREDICT 'price';""", + "Select HomeSalesForecast(10);", + ] + job_query = f"""CREATE JOB my_job AS {{ + {''.join(queries)} + }} + START '2023-04-01' + END '2023-05-01' + EVERY 2 hour + """ + + parser = Parser() + job_stmt = parser.parse(job_query)[0] + self.assertEqual(job_stmt.job_name, "my_job") + self.assertEqual(len(job_stmt.queries), 2) + self.assertTrue(queries[0].rstrip(";") == str(job_stmt.queries[0])) + self.assertTrue(queries[1].rstrip(";") == str(job_stmt.queries[1])) + self.assertEqual(job_stmt.start_time, "2023-04-01") + self.assertEqual(job_stmt.end_time, "2023-05-01") + self.assertEqual(job_stmt.repeat_interval, 2) + self.assertEqual(job_stmt.repeat_period, "hour") diff --git a/test/unit_tests/utils/test_job_scheduler.py b/test/unit_tests/utils/test_job_scheduler.py new file mode 100644 index 0000000000..36f54cd9b6 --- /dev/null +++ b/test/unit_tests/utils/test_job_scheduler.py @@ -0,0 +1,81 @@ +# coding=utf-8 +# Copyright 2018-2023 EvaDB +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest +from datetime import datetime, timedelta + +from mock import MagicMock + +from evadb.catalog.models.utils import JobCatalogEntry +from evadb.utils.job_scheduler import JobScheduler + + +class JobSchedulerTests(unittest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def get_dummy_job_catalog_entry(self, active, job_name, next_run): + return JobCatalogEntry( + name=job_name, + queries=None, + start_time=None, + end_time=None, + repeat_interval=None, + active=active, + next_scheduled_run=next_run, + created_at=None, + updated_at=None, + ) + + def test_sleep_time_calculation(self): + past_job = self.get_dummy_job_catalog_entry( + True, "past_job", datetime.now() - timedelta(seconds=10) + ) + future_job = self.get_dummy_job_catalog_entry( + True, "future_job", datetime.now() + timedelta(seconds=20) + ) + + job_scheduler = JobScheduler(MagicMock()) + + self.assertEqual(job_scheduler._get_sleep_time(past_job), 0) + self.assertGreaterEqual(job_scheduler._get_sleep_time(future_job), 10) + self.assertEqual(job_scheduler._get_sleep_time(None), 30) + + def test_update_next_schedule_run(self): + future_time = datetime.now() + timedelta(seconds=1000) + job_scheduler = JobScheduler(MagicMock()) + job_entry = self.get_dummy_job_catalog_entry(True, "job", datetime.now()) + + # job which runs just once + job_entry.end_time = future_time + status, next_run = job_scheduler._update_next_schedule_run(job_entry) + self.assertEqual(status, False, "status for one time job should be false") + + # recurring job with valid end date + job_entry.end_time = future_time + job_entry.repeat_interval = 120 + expected_next_run = datetime.now() + timedelta(seconds=120) + status, next_run = job_scheduler._update_next_schedule_run(job_entry) + self.assertEqual(status, True, "status for recurring time job should be true") + self.assertGreaterEqual(next_run, expected_next_run) + + # recurring job with expired end date + job_entry.end_time = datetime.now() + timedelta(seconds=60) + job_entry.repeat_interval = 120 + expected_next_run = datetime.now() + timedelta(seconds=120) + status, next_run = job_scheduler._update_next_schedule_run(job_entry) + self.assertEqual( + status, False, "status for rexpired ecurring time job should be false" + ) + self.assertLessEqual(next_run, datetime.now())