Skip to content

Commit

Permalink
Merge pull request #361 from datayoga-io/360-build-connections-jsonsc…
Browse files Browse the repository at this point in the history
…hema-and-use-it-in-schemastore-catalog

build connections schema and rename to `connections.dy.yaml`
  • Loading branch information
spicy-sauce authored Mar 13, 2024
2 parents c702877 + 926fe46 commit 00cde4a
Show file tree
Hide file tree
Showing 31 changed files with 1,216 additions and 705 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Generate JSON Schema
name: Generate JSON Schemas

on:
workflow_dispatch:
Expand Down Expand Up @@ -36,9 +36,10 @@ jobs:
poetry build
pip install dist/*.whl
- name: Generate DataYoga Job Schema
- name: Generate DataYoga Job and Connections Schemas
run: |
python -c "import json; from datayoga_core.job import Job; schema = Job.get_json_schema(); open('schemas/job.schema.json', 'w').write(json.dumps(schema))"
python -c "import json; from datayoga_core.connection import Connection; schema = Connection.get_json_schema(); open('schemas/coonnections.schema.json', 'w').write(json.dumps(schema))"
- name: Prettify JSON Schema files
run: prettier --write "schemas/**/*.json"
Expand All @@ -47,6 +48,7 @@ jobs:
run: |
git config user.name github-actions
git config user.email [email protected]
git pull
git add .
if ! git diff --cached --exit-code; then
git commit -m "update json schemas"
Expand Down
5 changes: 3 additions & 2 deletions cli/src/datayoga/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import datayoga_core as dy
import jsonschema
from datayoga_core import prometheus, utils
from datayoga_core.connection import Connection
from pkg_resources import DistributionNotFound, get_distribution

from datayoga import cli_helpers
Expand Down Expand Up @@ -96,11 +97,11 @@ def run(
logger.info("Runner started...")

# validate the connections
connections_file = path.join(directory, "connections.yaml")
connections_file = path.join(directory, "connections.dy.yaml")
try:
connections = utils.read_yaml(connections_file)
logger.debug(f"connections: {connections}")
connections_schema = dy.get_connections_json_schema()
connections_schema = Connection.get_json_schema()
logger.debug(f"connections_schema: {connections_schema}")
jsonschema.validate(instance=connections, schema=connections_schema)
except jsonschema.exceptions.ValidationError as schema_error:
Expand Down
37 changes: 0 additions & 37 deletions core/src/datayoga_core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional

from datayoga_core import utils
from datayoga_core.context import Context
from datayoga_core.job import Job
from datayoga_core.result import JobResult
Expand Down Expand Up @@ -66,37 +63,3 @@ def transform(
job.init(context)
logger.debug("Transforming data")
return job.transform(data)


def get_connections_json_schema() -> Dict[str, Any]:
# get the folder of all connection-specific schemas
connection_schemas_folder = utils.get_resource_path(os.path.join("schemas", "connections"))
# we traverse the json schemas for connection types
schema_paths = Path(connection_schemas_folder).rglob("**/*.schema.json")
connection_types = []
connection_schemas = []
for schema_path in schema_paths:
connection_type = schema_path.name.split(".")[0]
connection_types.append(connection_type)

schema = utils.read_json(f"{schema_path}")
# append to the array of allOf for the full schema
connection_schemas.append({
"if": {
"properties": {
"type": {
"const": connection_type
},
},
},
"then": schema
})

connections_general_schema = utils.read_json(
os.path.join(
utils.get_bundled_dir() if utils.is_bundled() else os.path.dirname(os.path.realpath(__file__)),
"resources", "schemas", "connections.schema.json"))
connections_general_schema["patternProperties"]["."]["allOf"] = connection_schemas
connections_general_schema["patternProperties"]["."]["properties"]["type"]["enum"] = connection_types

return connections_general_schema
5 changes: 3 additions & 2 deletions core/src/datayoga_core/blocks/cassandra/write/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import cassandra.auth
from cassandra.cluster import NoHostAvailable, PreparedStatement
from datayoga_core import utils, write_utils
from datayoga_core import write_utils
from datayoga_core.block import Block as DyBlock
from datayoga_core.connection import Connection
from datayoga_core.context import Context
from datayoga_core.opcode import OpCode
from datayoga_core.result import BlockResult, Result, Status
Expand All @@ -19,7 +20,7 @@ def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")

connection_name = self.properties["connection"]
connection_details = utils.get_connection_details(connection_name, context)
connection_details = Connection.get_connection_details(connection_name, context)
if connection_details.get("type") != "cassandra":
raise ValueError(f"{connection_name} is not a cassandra connection")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"connection": {
"type": "string",
"title": "The connection to use for loading",
"description": "Logical connection name as defined in the connections.yaml",
"description": "Logical connection name as defined in the connections.dy.yaml",
"examples": ["europe_db", "target", "eu_dwh"]
},
"keyspace": {
Expand Down
3 changes: 2 additions & 1 deletion core/src/datayoga_core/blocks/http/write/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import aiohttp
from datayoga_core import expression, utils
from datayoga_core.block import Block as DyBlock
from datayoga_core.connection import Connection
from datayoga_core.context import Context
from datayoga_core.expression import Expression
from datayoga_core.result import BlockResult, Result, Status
Expand All @@ -24,7 +25,7 @@ def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")

connection_name = self.properties["connection"]
connection_details = utils.get_connection_details(connection_name, context)
connection_details = Connection.get_connection_details(connection_name, context)
if connection_details["type"] != "http":
raise ValueError(f"{connection_name} is not an HTTP connection")

Expand Down
2 changes: 1 addition & 1 deletion core/src/datayoga_core/blocks/http/write/block.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"connection": {
"type": "string",
"title": "The connection to use for the HTTP request",
"description": "Logical connection name as defined in the connections.yaml",
"description": "Logical connection name as defined in the connections.dy.yaml",
"examples": ["api_connection", "external_service"]
},
"endpoint": {
Expand Down
3 changes: 2 additions & 1 deletion core/src/datayoga_core/blocks/redis/lookup/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import redis
from datayoga_core import expression, utils
from datayoga_core.block import Block as DyBlock
from datayoga_core.connection import Connection
from datayoga_core.context import Context
from datayoga_core.result import BlockResult, Result, Status

Expand All @@ -17,7 +18,7 @@ class Block(DyBlock, metaclass=ABCMeta):
def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")

connection_details = utils.get_connection_details(self.properties.get("connection"), context)
connection_details = Connection.get_connection_details(self.properties.get("connection"), context)

# Dry mode is internal and used for validate the block without establishing a connection.
# This behavior should be implemented in a common way, see this issue: https://lnk.pw/eklj
Expand Down
4 changes: 2 additions & 2 deletions core/src/datayoga_core/blocks/redis/read_stream/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import datayoga_core.blocks.redis.utils as redis_utils
import orjson
from datayoga_core import utils
from datayoga_core.connection import Connection
from datayoga_core.context import Context
from datayoga_core.producer import Message
from datayoga_core.producer import Producer as DyProducer
Expand All @@ -15,7 +15,7 @@ class Block(DyProducer):
def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")

connection_details = utils.get_connection_details(self.properties["connection"], context)
connection_details = Connection.get_connection_details(self.properties["connection"], context)
self.redis_client = redis_utils.get_client(connection_details)

self.stream = self.properties["stream_name"]
Expand Down
5 changes: 3 additions & 2 deletions core/src/datayoga_core/blocks/redis/write/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import datayoga_core.blocks.redis.utils as redis_utils
import redis
from datayoga_core import expression, utils
from datayoga_core import expression
from datayoga_core.block import Block as DyBlock
from datayoga_core.connection import Connection
from datayoga_core.context import Context
from datayoga_core.result import BlockResult, Result, Status

Expand All @@ -17,7 +18,7 @@ class Block(DyBlock, metaclass=ABCMeta):
def init(self, context: Optional[Context] = None):
logger.debug(f"Initializing {self.get_block_name()}")

connection_details = utils.get_connection_details(self.properties["connection"], context)
connection_details = Connection.get_connection_details(self.properties["connection"], context)
self.redis_client = redis_utils.get_client(connection_details)

self.command = self.properties.get("command", "HSET")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
"connection": {
"type": "string",
"title": "The connection to use for loading",
"description": "Logical connection name as defined in the connections.yaml",
"description": "Logical connection name as defined in the connections.dy.yaml",
"examples": ["europe_db", "target", "eu_dwh"]
},
"schema": {
"type": "string",
"title": "The table schema of the table",
"description": "If left blank, the default schema of this connection will be used as defined in the connections.yaml",
"description": "If left blank, the default schema of this connection will be used as defined in the connections.dy.yaml",
"examples": ["dbo"]
},
"table": {
Expand Down
4 changes: 2 additions & 2 deletions core/src/datayoga_core/blocks/relational/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Tuple

import sqlalchemy as sa
from datayoga_core import utils
from datayoga_core.connection import Connection
from datayoga_core.context import Context

logger = logging.getLogger("dy")
Expand All @@ -28,7 +28,7 @@ class DbType(str, Enum):


def get_engine(connection_name: str, context: Context, autocommit: bool = True) -> Tuple[sa.engine.Engine, DbType]:
connection_details = utils.get_connection_details(connection_name, context)
connection_details = Connection.get_connection_details(connection_name, context)

db_type = DbType(connection_details.get("type", "").lower())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"connection": {
"type": "string",
"title": "The connection to use for loading",
"description": "Logical connection name as defined in the connections.yaml",
"description": "Logical connection name as defined in the connections.dy.yaml",
"examples": ["europe_db", "target", "eu_dwh"]
},
"schema": {
Expand Down
39 changes: 39 additions & 0 deletions core/src/datayoga_core/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
from pathlib import Path
from typing import Any, Dict

from datayoga_core import utils
from datayoga_core.context import Context


class Connection:
"""Connection"""

@staticmethod
def get_connection_details(connection_name: str, context: Context) -> Dict[str, Any]:
"""Gets connection details from the context"""
if context and context.properties:
connection = context.properties.get("connections", {}).get(connection_name)
if connection:
return connection

raise ValueError(f"{connection_name} connection not found")

@staticmethod
def get_json_schema() -> Dict[str, Any]:
"""Compiles a complete JSON schema of the connection with all possible types"""
connection_schemas = []

connections_dir = utils.get_resource_path(os.path.join("schemas", "connections"))

for schema_path in sorted(Path(connections_dir).rglob("**/*.schema.json"), key=lambda p: p.stem):
connection_schemas.append(utils.read_json(f"{schema_path}"))

connections_general_schema = utils.read_json(
os.path.join(
utils.get_bundled_dir() if utils.is_bundled() else os.path.dirname(os.path.realpath(__file__)),
"resources", "schemas", "connections.schema.json"))

connections_general_schema["definitions"]["connection"]["oneOf"] = connection_schemas

return connections_general_schema
6 changes: 3 additions & 3 deletions core/src/datayoga_core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ def get_json_schema(whitelisted_blocks: Optional[List[str]] = None) -> Dict[str,
block_schemas = []
# we traverse the json schemas directly instead of 'walk_packages'
# to avoid importing all of the block classes
schema_paths = Path(os.path.join(utils.get_bundled_dir(), "blocks") if utils.is_bundled() else os.path.dirname(
os.path.realpath(blocks.__file__))).rglob("**/block.schema.json")
blocks_dir = (os.path.join(utils.get_bundled_dir(), "blocks") if utils.is_bundled() else
os.path.dirname(os.path.realpath(blocks.__file__)))
block_types = []
for schema_path in schema_paths:
for schema_path in Path(blocks_dir).rglob("**/block.schema.json"):
block_type = os.path.relpath(
os.path.dirname(schema_path),
os.path.dirname(os.path.realpath(blocks.__file__))
Expand Down
4 changes: 2 additions & 2 deletions core/src/datayoga_core/resources/scaffold/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
├── .gitignore
├── data
│ └── sample.csv
├── connections.yaml
├── connections.dy.yaml
└── jobs
└── sample
└── hello.dy.yaml
```

- `.gitignore`: For convenience, this is used to ignore the data folder.
- `data`: Folder to store data input files or output. This folder can be located anywhere as long as the runner has access to it.
- `connections.yaml`: Contains definitions of source and target connectors and other general settings.
- `connections.dy.yaml`: Contains definitions of source and target connectors and other general settings.
- `jobs`: Source job YAMLs. These can be nested and referenced as modules using a dot notation. e.g. `jobs/sample/hello.dy.yaml` is referenced as `sample.hello` when running the job.

## Run a Job
Expand Down
Loading

0 comments on commit 00cde4a

Please sign in to comment.