Skip to content

Commit

Permalink
Job scheduler implementation (#1308)
Browse files Browse the repository at this point in the history
This PR adds support for creating and dropping jobs in evadb based on
this [task](#1248).

1.  Jobs can be created using the create job query:

   
> CREATE JOB {job_name} AS {
>             {job_queries; ...}
>     }
>     START {start_time}
>     END {end_time}
>     EVERY {repeat_period} {repeat_unit}

2. Created jobs can be dropped using:

> DROP JOB {job_name}

3. The scheduled jobs will only be triggered if the job scheduler
process is started explicitly using:

> EvaDBConnection.start_jobs()

4. The job scheduler process can be stopped using:

> EvaDBConnection.stop_jobs()

---------

Co-authored-by: Gaurav Tarlok Kakkar <[email protected]>
  • Loading branch information
dungnmaster and gaurav274 authored Nov 14, 2023
1 parent 995f7fe commit 1fbb74f
Show file tree
Hide file tree
Showing 25 changed files with 1,452 additions and 10 deletions.
140 changes: 140 additions & 0 deletions evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import shutil
from pathlib import Path
from typing import Any, List
Expand Down Expand Up @@ -39,6 +40,8 @@
FunctionIOCatalogEntry,
FunctionMetadataCatalogEntry,
IndexCatalogEntry,
JobCatalogEntry,
JobHistoryCatalogEntry,
TableCatalogEntry,
drop_all_tables_except_catalog,
init_db,
Expand All @@ -61,6 +64,8 @@
FunctionMetadataCatalogService,
)
from evadb.catalog.services.index_catalog_service import IndexCatalogService
from evadb.catalog.services.job_catalog_service import JobCatalogService
from evadb.catalog.services.job_history_catalog_service import JobHistoryCatalogService
from evadb.catalog.services.table_catalog_service import TableCatalogService
from evadb.catalog.sql_config import IDENTIFIER_COLUMN, SQLConfig
from evadb.expression.function_expression import FunctionExpression
Expand All @@ -85,6 +90,10 @@ def __init__(self, db_uri: str):
self._config_catalog_service = ConfigurationCatalogService(
self._sql_config.session
)
self._job_catalog_service = JobCatalogService(self._sql_config.session)
self._job_history_catalog_service = JobHistoryCatalogService(
self._sql_config.session
)
self._table_catalog_service = TableCatalogService(self._sql_config.session)
self._column_service = ColumnCatalogService(self._sql_config.session)
self._function_service = FunctionCatalogService(self._sql_config.session)
Expand Down Expand Up @@ -215,6 +224,137 @@ def check_native_table_exists(self, table_name: str, database_name: str):

return True

"Job catalog services"

def insert_job_catalog_entry(
self,
name: str,
queries: str,
start_time: datetime,
end_time: datetime,
repeat_interval: int,
active: bool,
next_schedule_run: datetime,
) -> JobCatalogEntry:
"""A new entry is persisted in the job catalog.
Args:
name: job name
queries: job's queries
start_time: job start time
end_time: job end time
repeat_interval: job repeat interval
active: job status
next_schedule_run: next run time as per schedule
"""
job_entry = self._job_catalog_service.insert_entry(
name,
queries,
start_time,
end_time,
repeat_interval,
active,
next_schedule_run,
)

return job_entry

def get_job_catalog_entry(self, job_name: str) -> JobCatalogEntry:
"""
Returns the job catalog entry for the given database_name
Arguments:
job_name (str): name of the job
Returns:
JobCatalogEntry
"""

table_entry = self._job_catalog_service.get_entry_by_name(job_name)

return table_entry

def drop_job_catalog_entry(self, job_entry: JobCatalogEntry) -> bool:
"""
This method deletes the job from catalog.
Arguments:
job_entry: job catalog entry to remove
Returns:
True if successfully deleted else False
"""
return self._job_catalog_service.delete_entry(job_entry)

def get_next_executable_job(self, only_past_jobs: bool = False) -> JobCatalogEntry:
"""Get the oldest job that is ready to be triggered by trigger time
Arguments:
only_past_jobs: boolean flag to denote if only jobs with trigger time in
past should be considered
Returns:
Returns the first job to be triggered
"""
return self._job_catalog_service.get_next_executable_job(only_past_jobs)

def update_job_catalog_entry(
self, job_name: str, next_scheduled_run: datetime, active: bool
):
"""Update the next_scheduled_run and active column as per the provided values
Arguments:
job_name (str): job which should be updated
next_run_time (datetime): the next trigger time for the job
active (bool): the active status for the job
"""
self._job_catalog_service.update_next_scheduled_run(
job_name, next_scheduled_run, active
)

"Job history catalog services"

def insert_job_history_catalog_entry(
self,
job_id: str,
job_name: str,
execution_start_time: datetime,
execution_end_time: datetime,
) -> JobCatalogEntry:
"""A new entry is persisted in the job history catalog.
Args:
job_id: job id for the execution entry
job_name: job name for the execution entry
execution_start_time: job execution start time
execution_end_time: job execution end time
"""
job_history_entry = self._job_history_catalog_service.insert_entry(
job_id, job_name, execution_start_time, execution_end_time
)

return job_history_entry

def get_job_history_by_job_id(self, job_id: int) -> list[JobHistoryCatalogEntry]:
"""Returns all the entries present for this job_id on in the history.
Args:
job_id: the id of job whose history should be fetched
"""
return self._job_history_catalog_service.get_entry_by_job_id(job_id)

def update_job_history_end_time(
self, job_id: int, execution_start_time: datetime, execution_end_time: datetime
) -> list[JobHistoryCatalogEntry]:
"""Updates the execution_end_time for this job history matching job_id and execution_start_time.
Args:
job_id: id of the job whose history entry which should be updated
execution_start_time: the start time for the job history entry
execution_end_time: the end time for the job history entry
"""
return self._job_history_catalog_service.update_entry_end_time(
job_id, execution_start_time, execution_end_time
)

"Table catalog services"

def insert_table_catalog_entry(
Expand Down
92 changes: 92 additions & 0 deletions evadb/catalog/models/job_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import json

from sqlalchemy import Boolean, Column, DateTime, Index, Integer, String
from sqlalchemy.orm import relationship

from evadb.catalog.models.base_model import BaseModel
from evadb.catalog.models.utils import JobCatalogEntry


class JobCatalog(BaseModel):
"""The `JobCatalog` catalog stores information about all the created Jobs.
`_row_id:` an autogenerated unique identifier.
`_name:` the job name.
`_queries:` the queries to run as part of this job
`_start_time:` the job's start time
`_end_time:` the job's end time
`_repeat_interval:` the job's repeat interval
`_repeat_period:` the job's repeat period
`_active:` is the job active/deleted
`_next_scheduled_run:` the next trigger time for the job as per the schedule
`_created_at:` entry creation time
`_updated_at:` entry last update time
"""

__tablename__ = "job_catalog"

_name = Column("name", String(100), unique=True)
_queries = Column("queries", String, nullable=False)
_start_time = Column("start_time", DateTime, default=datetime.datetime.now)
_end_time = Column("end_ts", DateTime)
_repeat_interval = Column("repeat_interval", Integer)
_active = Column("active", Boolean, default=True)
_next_scheduled_run = Column("next_scheduled_run", DateTime)

_created_at = Column("created_at", DateTime, default=datetime.datetime.now)
_updated_at = Column(
"updated_at",
DateTime,
default=datetime.datetime.now,
onupdate=datetime.datetime.now,
)

_next_run_index = Index("_next_run_index", _next_scheduled_run)
_job_history_catalog = relationship("JobHistoryCatalog", cascade="all, delete")

def __init__(
self,
name: str,
queries: str,
start_time: datetime,
end_time: datetime,
repeat_interval: Integer,
active: bool,
next_schedule_run: datetime,
):
self._name = name
self._queries = queries
self._start_time = start_time
self._end_time = end_time
self._repeat_interval = repeat_interval
self._active = active
self._next_scheduled_run = next_schedule_run

def as_dataclass(self) -> "JobCatalogEntry":
return JobCatalogEntry(
row_id=self._row_id,
name=self._name,
queries=json.loads(self._queries),
start_time=self._start_time,
end_time=self._end_time,
repeat_interval=self._repeat_interval,
active=self._active,
next_scheduled_run=self._next_scheduled_run,
created_at=self._created_at,
updated_at=self._updated_at,
)
73 changes: 73 additions & 0 deletions evadb/catalog/models/job_history_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime

from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, UniqueConstraint

from evadb.catalog.models.base_model import BaseModel
from evadb.catalog.models.utils import JobHistoryCatalogEntry


class JobHistoryCatalog(BaseModel):
"""The `JobHistoryCatalog` stores the execution history of jobs .
`_row_id:` an autogenerated unique identifier.
`_job_id:` job id.
`_job_name:` job name.
`_execution_start_time:` start time of this run
`_execution_end_time:` end time for this run
`_created_at:` entry creation time
`_updated_at:` entry last update time
"""

__tablename__ = "job_history_catalog"

_job_id = Column(
"job_id", Integer, ForeignKey("job_catalog._row_id", ondelete="CASCADE")
)
_job_name = Column("job_name", String(100))
_execution_start_time = Column("execution_start_time", DateTime)
_execution_end_time = Column("execution_end_time", DateTime)
_created_at = Column("created_at", DateTime, default=datetime.datetime.now)
_updated_at = Column(
"updated_at",
DateTime,
default=datetime.datetime.now,
onupdate=datetime.datetime.now,
)

__table_args__ = (UniqueConstraint("job_id", "execution_start_time"), {})

def __init__(
self,
job_id: int,
job_name: str,
execution_start_time: datetime,
execution_end_time: datetime,
):
self._job_id = job_id
self._job_name = job_name
self._execution_start_time = execution_start_time
self._execution_end_time = execution_end_time

def as_dataclass(self) -> "JobHistoryCatalogEntry":
return JobHistoryCatalogEntry(
row_id=self._row_id,
job_id=self._job_id,
job_name=self._job_name,
execution_start_time=self._execution_start_time,
execution_end_time=self._execution_end_time,
created_at=self._created_at,
updated_at=self._updated_at,
)
Loading

0 comments on commit 1fbb74f

Please sign in to comment.