Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error when an error occurs in some soda check #23

Open
carlosnizolli opened this issue Jun 30, 2023 · 9 comments
Open

Error when an error occurs in some soda check #23

carlosnizolli opened this issue Jun 30, 2023 · 9 comments

Comments

@carlosnizolli
Copy link

carlosnizolli commented Jun 30, 2023

When all tests pass, no error occurs
Some libs used
prefect==2.4.2
prefect_shell==0.1.3
prefect-soda-core[postgres]
prefect-soda-core[athena]
soda-core
soda-core-postgres
soda-core-athena
MarkupSafe==2.0.1
PyAthena

[17:01:56]       c20 nulos [PASSED]
[17:01:56]       C20 deve ser maior ou igual a 0 [PASSED]
[17:01:56]       C40 nulos [PASSED]
[17:01:56]       C40 deve ser maior ou igual a 0 [PASSED]
[17:01:56]     db_maritimo [Duplicated] in cabotagem
[17:01:56]       CEs duplicados [PASSED]
[17:01:56]     db_maritimo [Today] in cabotagem
[17:01:56]       Sem registros criados na data de hoje [PASSED]
[17:01:56]       Data Operação null [PASSED]
05:01:56 PM
soda_scan_execute-1578bf38-0
[17:01:56] 2/30 checks WARNED: 
05:01:56 PM
soda_scan_execute-1578bf38-0
[17:01:56]     db_maritimo [Five] in cabotagem
[17:01:56]       tipo_fcl nulos [WARNED]
[17:01:56]         check_value: 1.03
[17:01:56]         row_count: 7749
[17:01:56]         missing_count: 80
[17:01:56]     db_maritimo [Duplicated] in cabotagem
[17:01:56]       Embarque duplicados (id_porto_carga,id_porto_origem,nrblconhecimento,lista_fcl iguais) [WARNED]
[17:01:56]         check_value: 2368
05:01:56 PM
soda_scan_execute-1578bf38-0
[17:01:56] Only 2 warnings. 0 failures. 0 errors. 28 pass.
[17:01:56] Sending results to Soda Cloud
05:01:56 PM
soda_scan_execute-1578bf38-0
[17:02:03] Soda Cloud Trace: 7899457579506349419
05:02:03 PM
soda_scan_execute-1578bf38-0
Saving scan results to 2023-06-30T20:00:37.181228+00:00--soda_scan_execute-1578bf38-0.json
05:02:03 PM
soda_scan_execute-1578bf38-0
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1214, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect_soda_core/tasks.py", line 142, in soda_scan_execute
    raise e
  File "/usr/local/lib/python3.8/site-packages/prefect_soda_core/tasks.py", line 134, in soda_scan_execute
    soda_logs = await shell_run_command.fn(
  File "/usr/local/lib/python3.8/site-packages/prefect_shell/commands.py", line 103, in shell_run_command
    raise RuntimeError(msg)
RuntimeError: Command failed with exit code 1:
Saving scan results to 2023-06-30T20:00:37.181228+00:00--soda_scan_execute-1578bf38-0.json
05:02:08 PM
soda_scan_execute-1578bf38-0
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 596, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 117, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/usr/local/lib/python3.8/site-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 97, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "flows/Governanca/soda/soda_cabotagem_flow.py", line 26, in soda_cabotagem_captura_flow
    run_soda_scan('psql',
  File "/tmp/tmpsv7sxzh_prefect/flows/lib_classes/libsoda.py", line 30, in run_soda_scan
    return soda_scan_execute(
  File "/usr/local/lib/python3.8/site-packages/prefect/tasks.py", line 295, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 736, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 137, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.8/site-packages/anyio/from_thread.py", line 47, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 906, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/local/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 874, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.8/site-packages/prefect/futures.py", line 237, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/usr/local/lib/python3.8/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/usr/local/lib/python3.8/site-packages/prefect/engine.py", line 1214, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/prefect_soda_core/tasks.py", line 142, in soda_scan_execute
    raise e
  File "/usr/local/lib/python3.8/site-packages/prefect_soda_core/tasks.py", line 134, in soda_scan_execute
    soda_logs = await shell_run_command.fn(
  File "/usr/local/lib/python3.8/site-packages/prefect_shell/commands.py", line 103, in shell_run_command
    raise RuntimeError(msg)
RuntimeError: Command failed with exit code 1:

@AlessandroLollo
Copy link
Collaborator

Hey @carlosnizolli 👋
It seems the real cause of the issue is not reported in the log.
Can you try running Soda outside Prefect?
Do you see the same error?
Any additional debugging log would be useful

@carlosnizolli
Copy link
Author

Hi @AlessandroLollo,
Outside the prefect no error occurs.
I'll try to get some more logs and send them later.

@carlosnizolli
Copy link
Author

From the tests I've done, only when soda returns a warning does the flow break in this soda_logs variable

@AlessandroLollo
Copy link
Collaborator

Hey @carlosnizolli 👋
Thanks for the update!
Would you mind providing a reproducible example of such behavior?
Otherwise it's a bit hard to investigate.

Thanks!

@carlosnizolli
Copy link
Author

I'm running on a machine with these libs

scikit-learn
astroid==2.4.2
awswrangler
backcall==0.2.0
beautifulsoup4==4.9.2
boto3
botocore
bs4==0.0.1
cachetools==4.2.0
certifi==2020.12.5
cffi==1.14.3
chardet==3.0.4
click==8.1.2
cloudpickle==2.0
contextvars==2.4
croniter==1.0.12
cryptography==36.0.1
cycler==0.10.0
dask==2.25.0
decorator==4.4.2
distributed==2.25.0
elastic-transport==8.4.0
elasticsearch==7.13.4
future==0.18.2
google-api-core==1.24.1
google-auth==1.24.0
grpcio==1.34.0
HeapDict==1.0.1
idna==2.10
immutables==0.14
ipfn
ipykernel==5.3.4
ipython-genutils==0.2.0
ipython==7.16.1
isodate==0.6.0
isort==5.4.2
jedi==0.17.2
jellyfish
Jinja2==2.11.2
jmespath==0.10.0
joblib==0.16.0
kiwisolver==1.2.0
lazy-object-proxy==1.4.3
lxml==4.5.2
MarkupSafe==2.0.1
marshmallow-oneofschema==2.0.1
marshmallow==3.7.1
matplotlib
mccabe==0.6.1
Metaphone==0.6
msgpack==1.0.0
msrest==0.6.19
mypy-extensions==0.4.3
natsort==7.0.1
nltk
numpy
numpyencoder==0.3.0
oauthlib==3.1.0
oscrypto==1.2.1
pandas
parso==0.7.1
pendulum==2.1.2
pexpect==4.8.0
pickleshare==0.7.5
Pillow==7.2.0
prefect==2.4.2
prefect_aws
prompt-toolkit==3.0.6
proto-plus==1.13.0
protobuf
psutil==5.7.2
psycopg2-binary==2.9.3
ptyprocess==0.6.0
pyarrow
pyasn1-modules==0.2.8
pyasn1==0.4.8
PyAthena==2.2.0
pycparser==2.20
pycrypto==2.6.1
pycryptodomex==3.9.8
pydrive==1.2.1
Pygments==2.6.1
PyJWT==1.7.1
pylint==2.6.0
pyOpenSSL==19.1.0
pyparsing==2.4.7
python-dateutil==2.8.2
python-decouple==3.3
python-slugify==5.0.1
pytz==2021.1
pytzdata==2020.1
PyYAML==5.4.1
pyzmq==19.0.2
rarfile==4.0
regex==2020.7.14
requests-aws4auth==1.2.2
requests-oauthlib==1.3.0
requests
rsa==4.6
s3fs
s3transfer
scipy==1.5.2
seaborn==0.10.1
setuptools==49.6.0
simple-crypt==4.1.7
six==1.15.0
##sklearn
## The 'sklearn' PyPI package is deprecated use scikit-learn
##scikit-learn
#####
sortedcontainers==2.2.2
soupsieve==2.0.1
sparse_dot_topn
tabulate==0.8.7
tblib==1.7.0
tenacity==6.2.0
text-unidecode==1.3
threadpoolctl==2.1.0
toml==0.10.1
toolz==0.10.0
tornado==6.0.4
tqdm==4.48.2
traitlets==4.3.3
transformers
typed-ast==1.4.1
urllib3
wcwidth==0.2.5
websocket-client==0.57.0
wheel==0.35.1
wrapt==1.12.1
zict==2.0.0
prefect_shell==0.1.3
## prefect_shell current versions do not work (0.1.5 and 0.1.4)
soda-core
soda-core-postgres
soda-core-athena
prefect-soda-core[postgres]
prefect-soda-core[athena]

My code is like this

from decouple import config
from prefect_soda_core.soda_configuration import SodaConfiguration
from prefect_soda_core.sodacl_check import SodaCLCheck
from prefect_soda_core.tasks import soda_scan_execute

ENV = config("ENV")
labels = [ENV]

PATH = config('SODA_CORE_DIR')


def run_soda_scan(database, config_file, checks, data_source, variables_dict):
    if database == 'athena':
        variables_dict['ACCESS_KEY_ID'] = config('ATHENA_AWS_ACCESS_KEY_ID')
        variables_dict['SECRET_KEY'] = config('ATHENA_AWS_SECRET_ACCESS_KEY')
    else:
        variables_dict['POSTGRES_USERNAME'] = config('DB_QA_SODA_USER')
        variables_dict['POSTGRES_PASSWORD'] = config('DB_QA_SODA_PASSWORD')

    variables_dict['API_PUBLIC'] = config('SODA_CLOUD_API_KEY_ID')
    variables_dict['API_PRIVATE'] = config('SODA_CLOUD_API_SECRET')

    soda_configuration_block = SodaConfiguration(
        configuration_yaml_path=PATH + config_file
    )
    soda_check_block = SodaCLCheck(
        sodacl_yaml_path=PATH + checks
    )

    return soda_scan_execute(
        data_source_name=data_source,
        configuration=soda_configuration_block,
        checks=soda_check_block,
        variables=variables_dict,
        return_scan_result_file_content=True,
        verbose=False
    )

My flow is like this

from prefect import flow
from decouple import config
from flows.lib_classes.libprefect import BasePrefectService
from flows.lib_classes.libsoda import run_soda_scan

ENV = config("ENV")
labels = [ENV]


@flow(timeout_seconds=600)
def soda_cabotagem_athena_flow():

    run_soda_scan('athena',
                  'cabotagem/config_athena.yml',
                  'cabotagem/cabotagem_athena.yml',
                  'cabotage_apresentation',
                  {})


@flow(timeout_seconds=1800)
def soda_cabotagem_captura_flow():
    soda_variables = {"DATA_SOURCE": "cabotagem",
                      "HOST": config('DB_SISTEMA_REPLICA_HOST')}

    run_soda_scan('psql',
                  'cabotagem/config_psql.yml',
                  'cabotagem/db_maritimo.yml',
                  'cabotagem',
                  soda_variables)


@flow(timeout_seconds=3600)
def soda_cabotagem_intermediaria_flow():
    soda_variables = {"DATA_SOURCE": "cabotagem_intermediaria",
                      "HOST": config('DB_SISTEMA_REPLICA_HOST')}

    run_soda_scan('psql',
                  'cabotagem/configuration.yml',
                  'cabotagem/db_maritimo_intermediaria.yml',
                  'cabotagem_intermediaria',
                  soda_variables)


flowName = "soda_cabotagem_flow"

PrefectFlow = BasePrefectService(flowName, labels)


@flow(name=flowName)
def soda_cabotagem_flow():
    soda_cabotagem_athena_flow()
    soda_cabotagem_captura_flow()
    # soda_cabotagem_intermediaria_flow()


databases = [
    'ATHENA QA', 'DB SISTEMA REPLICA'
]

PrefectFlow.register(soda_cabotagem_flow, "SODA", databases)

@AlessandroLollo
Copy link
Collaborator

Can you try setting verbose=True on soda_scan_execute?
I wonder if we can get some more information to debug the issue.

@carlosnizolli
Copy link
Author

@AlessandroLollo The code is handling only exception code 2 but warnings return with exit code 1
image
Another Problem, the soda_logs variable only exists in the return if return_scan_result_file_content is True, if it is false, an error occurs because the variable is created only inside the try

@ccueto36
Copy link

ccueto36 commented Jan 5, 2024

I get exit code 1 upon validation errors, not 2. Can "Command failed with exit code 1:" be also added to the if statement above? It's making my flow fail upon successful tests.

@AlessandroLollo
Copy link
Collaborator

Hey @ccueto36 👋
Would you be open to submit a PR for your use case?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants