Skip to content

Commit

Permalink
Format code with black (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
s-vitaliy authored Mar 20, 2023
1 parent bf24285 commit 1f570c6
Show file tree
Hide file tree
Showing 24 changed files with 890 additions and 800 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ jobs:
set -e
/github/home/.local/bin/poetry install
- name: Black
shell: bash
run: |
set -e
/github/home/.local/bin/poetry run black . --check --diff
- name: Lint
run: |
set -e
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Introduction
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)

Utility functions and classes for working with Dataframes, provisioning SparkSession and much more.

Expand Down
880 changes: 485 additions & 395 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ k8s = [
pytest = "^7.0"
pytest-cov = "^2.12"
pylint = "^2.12"
black = "~22.12.0"

[build-system]
requires = ["poetry-core>=1.2.0"]
build-backend = "poetry.core.masonry.api"

[tool.black]
line-length = 120
2 changes: 1 addition & 1 deletion spark_utils/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
Version file.
"""

__version__ = '0.0.0' # replaced by git tag on deploy
__version__ = "0.0.0" # replaced by git tag on deploy
28 changes: 9 additions & 19 deletions spark_utils/common/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ def decrypt_sensitive(sensitive_content: Optional[str]) -> Optional[str]:
:param sensitive_content: payload to decrypt
:return: Decrypted payload
"""
encryption_key = os.environ.get('RUNTIME_ENCRYPTION_KEY', '').encode('utf-8')
encryption_key = os.environ.get("RUNTIME_ENCRYPTION_KEY", "").encode("utf-8")

if not encryption_key:
print('Encryption key not set - skipping operation.')
print("Encryption key not set - skipping operation.")

if encryption_key and sensitive_content:
fernet = Fernet(encryption_key)
return fernet.decrypt(sensitive_content.encode('utf-8')).decode('utf-8')
return fernet.decrypt(sensitive_content.encode("utf-8")).decode("utf-8")

return None

Expand All @@ -76,14 +76,10 @@ def read_from_socket(
:return: Spark dataframe
"""
read_options = read_options or {}
if socket.data_format.startswith('hive'):
if socket.data_format.startswith("hive"):
return spark_session.table(socket.data_path)

return spark_session \
.read \
.options(**read_options) \
.format(socket.data_format) \
.load(socket.data_path)
return spark_session.read.options(**read_options).format(socket.data_format).load(socket.data_path)


def write_to_socket(
Expand All @@ -106,18 +102,12 @@ def write_to_socket(
if partition_count:
data = data.repartition(partition_count, *partition_by)

writer = data.write \
.mode('overwrite') \
.options(**write_options)
writer = data.write.mode("overwrite").options(**write_options)

if partition_by:
writer = writer.partitionBy(*partition_by)

if socket.data_format.startswith('hive'):
writer \
.format(socket.data_format.split('_')[-1]) \
.saveAsTable(socket.data_path)
if socket.data_format.startswith("hive"):
writer.format(socket.data_format.split("_")[-1]).saveAsTable(socket.data_path)
else:
writer \
.format(socket.data_format) \
.save(socket.data_path)
writer.format(socket.data_format).save(socket.data_path)
75 changes: 37 additions & 38 deletions spark_utils/common/spark_job_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,13 @@

class DecryptAction(argparse.Action):
"""
Action that performs decryption of a provided value using encryption key provided from the environment.
Action that performs decryption of a provided value using encryption key provided from the environment.
"""

def __init__(self,
option_strings,
dest,
const=None,
default=None,
required=False,
**kwargs):
def __init__(self, option_strings, dest, const=None, default=None, required=False, **kwargs):
super().__init__(
option_strings=option_strings,
dest=dest,
nargs=1,
const=const,
default=default,
required=required,
**kwargs)
option_strings=option_strings, dest=dest, nargs=1, const=const, default=default, required=required, **kwargs
)

def __call__(self, parser, namespace, values, option_string=None):
arg_value = values[0]
Expand All @@ -63,28 +52,38 @@ def __call__(self, parser, namespace, values, option_string=None):

class SparkJobArgs:
"""
Argsparse-based Spark job arguments provider
This adds three default arguments to each job:
- --source a|b|c d|e|f ...
Describes inputs used by a job
Here `a` is a mapping key that a developer should use to extract path/format information for the source
`b` is a source path, in URI format: file:///, abfss:// etc.
'c' is a data format: json, csv, delta etc.
- --output a|b|c d|e|f...
Describes output locations used by a job
Same meanings as source attributes
- --overwrite
Controls overwrite behaviour. Will wipe the whole directory if set and honored by job developer.
Argsparse-based Spark job arguments provider
This adds three default arguments to each job:
- --source a|b|c d|e|f ...
Describes inputs used by a job
Here `a` is a mapping key that a developer should use to extract path/format information for the source
`b` is a source path, in URI format: file:///, abfss:// etc.
'c' is a data format: json, csv, delta etc.
- --output a|b|c d|e|f...
Describes output locations used by a job
Same meanings as source attributes
- --overwrite
Controls overwrite behaviour. Will wipe the whole directory if set and honored by job developer.
"""

def __init__(self):
self._parser = argparse.ArgumentParser(description="Runtime arguments")
self._parser.add_argument("--source", type=str, nargs='+', default=[],
help='Sources to read data from, in a form of <source key>:<source path>')
self._parser.add_argument("--output", type=str, nargs='+', default=[],
help='Outputs to write data to, in a form of <output key>:<output path>')
self._parser.add_argument("--overwrite", dest='overwrite', action='store_true', help="Overwrite outputs")
self._parser.add_argument(
"--source",
type=str,
nargs="+",
default=[],
help="Sources to read data from, in a form of <source key>:<source path>",
)
self._parser.add_argument(
"--output",
type=str,
nargs="+",
default=[],
help="Outputs to write data to, in a form of <output key>:<output path>",
)
self._parser.add_argument("--overwrite", dest="overwrite", action="store_true", help="Overwrite outputs")

self._parsed_args = None
self._parsed_sources = None
Expand All @@ -93,11 +92,11 @@ def __init__(self):

def _sources(self) -> Iterable[JobSocket]:
for source in self._parsed_args.source:
yield JobSocket(*source.split('|'))
yield JobSocket(*source.split("|"))

def _outputs(self) -> Iterable[JobSocket]:
for output in self._parsed_args.output:
yield JobSocket(*output.split('|'))
yield JobSocket(*output.split("|"))

def new_arg(self, *args, **kwargs):
"""
Expand All @@ -116,10 +115,10 @@ def new_encrypted_arg(self, *args, **kwargs):
:param args: argsparse.add_argument(...)
:return:
"""
if 'action' not in kwargs:
kwargs.setdefault('action', DecryptAction)
if "action" not in kwargs:
kwargs.setdefault("action", DecryptAction)
else:
kwargs['action'] = DecryptAction
kwargs["action"] = DecryptAction

self._parser.add_argument(*args, **kwargs)

Expand Down
Loading

0 comments on commit 1f570c6

Please sign in to comment.