diff --git a/extras/glue_helper/README.md b/extras/glue_helper/README.md new file mode 100644 index 0000000..05f2bfa --- /dev/null +++ b/extras/glue_helper/README.md @@ -0,0 +1,27 @@ +# Glue Helpers + +If you had some AWS Glue Code that you want to adjust so you can run it in [Snowpark](https://docs.snowflake.com/en/developer-guide/snowpark/python/index) + +These helpers provide some classes with very similar API. + +They can be used in snowpark and can help to accelerate the migration of Glue scripts. + + +# Building the Helpers + +To build the helpers, you need the [snow-cli](https://docs.snowflake.com/en/developer-guide/snowflake-cli-v2/index). Go to the command line and run: + +`snow snowspark build` + +This will build a file called `sfjobs.zip` + +You can [upload this file to an snowflake stage using snowsight ](https://docs.snowflake.com/en/user-guide/data-load-local-file-system-stage-ui)or from the command line with the [snow-cli ](https://docs.snowflake.com/en/developer-guide/snowflake-cli-v2/index)(you can copy the file with `snow stage copy sfjobs.zip @mystage`) + +In the releases for this repository you can download an already pre-built version. + + +# Using in notebooks + +To use this in your notebooks (after uploading to an stage) go to packages and type the stage location. + +![package_from_stage](package_from_stage.png) diff --git a/extras/glue_helper/package_from_stage.png b/extras/glue_helper/package_from_stage.png new file mode 100644 index 0000000..67021f9 Binary files /dev/null and b/extras/glue_helper/package_from_stage.png differ diff --git a/extras/glue_helper/sfjobs/sfjobs/__init__.py b/extras/glue_helper/sfjobs/sfjobs/__init__.py new file mode 100644 index 0000000..9384d5e --- /dev/null +++ b/extras/glue_helper/sfjobs/sfjobs/__init__.py @@ -0,0 +1,107 @@ +from snowflake.snowpark import Session, DataFrame as DynamicFrame +from snowflake.snowpark._internal.analyzer.analyzer_utils import quote_name_without_upper_casing +from snowflake.snowpark.functions import try_cast, split, iff, typeof, col, object_construct, cast +from snowflake.snowpark._internal.error_message import SnowparkClientExceptionMessages +from snowflake.snowpark._internal.type_utils import snow_type_to_dtype_str +import logging +from snowflake.snowpark._internal.utils import quote_name +from sfjobs.transforms import ApplyMapping, ResolveChoice +from snowflake.snowpark.types import StructType, StructField, StringType, ArrayType, IntegerType, FloatType, BooleanType, DateType, TimestampType, VariantType, BinaryType + +from snowflake.snowpark._internal.utils import SNOWFLAKE_PATH_PREFIXES +import re + +## this is to extend the supported prefixes +if not "s3://" in SNOWFLAKE_PATH_PREFIXES: + SNOWFLAKE_PATH_PREFIXES.append("s3://") + +if not hasattr(DynamicFrame, '__sfjobs_extended__'): + setattr(DynamicFrame, '__sfjobs_extended__', True) + from snowflake.snowpark import DataFrame + + # Function to convert tick-quoted into double-quoted uppercase + def convert_string(s): + # Use regex to find the quoted string and convert it to uppercase + return re.sub(r"`(.*?)`", lambda match: f'"{match.group(1).upper()}"', s) + + __sql = Session.sql + def adjusted_sql(self, sql_text, *params): + sql_text = convert_string(sql_text) + return __sql(self, sql_text, *params) + setattr(Session, 'sql', adjusted_sql) + + ___sql = DynamicFrame.filter + def adjusted_filter(self, expr): + sql_text = convert_string(expr) + return ___sql(self, sql_text) + setattr(DynamicFrame, 'filter', adjusted_filter) + setattr(DynamicFrame, 'where', adjusted_filter) + + ## Adding case insensitive flag + def get_ci_property(self): + return self._allow_case_insensitive_column_names + def set_ci_property(self, value): + self._allow_case_insensitive_column_names = value + setattr(DynamicFrame,"get_ci_property",get_ci_property) + setattr(DynamicFrame,"set_ci_property",set_ci_property) + DynamicFrame.case_insensitive_resolution = property(get_ci_property, set_ci_property) + + ## Adding a method to get override default column resolution to enable also case insensitive search + def _case_insensitive_resolve(self, col_name: str): + normalized_col_name = quote_name(col_name) + if hasattr(self, "_allow_case_insensitive_column_names") and self._allow_case_insensitive_column_names: + normalized_col_name = normalized_col_name.upper() + cols = list(filter(lambda attr: attr.name.upper() == normalized_col_name, self._output)) + else: + cols = list(filter(lambda attr: attr.name == normalized_col_name, self._output)) + if len(cols) == 1: + return cols[0].with_name(normalized_col_name) + else: + raise SnowparkClientExceptionMessages.DF_CANNOT_RESOLVE_COLUMN_NAME( + col_name + ) + setattr(DynamicFrame,"_resolve",_case_insensitive_resolve) + + + ## dummy method + def fromDF(cls, dataframe, ctx, name): + if name: + logging.info(f"fromDF {name}") + return dataframe + DynamicFrame.fromDF = classmethod(fromDF) + + ## extends dataFrame class adding apply_mapping method + def apply_mapping(self, mappings, case_insensitive=True): + return ApplyMapping()(self, mappings, case_insensitive) + setattr(DynamicFrame, "apply_mapping", apply_mapping) + + + def resolveChoice(self: DataFrame, specs: list, ignore_case = True) -> DataFrame: + return ResolveChoice()(self, specs, ignore_case) + + setattr(DynamicFrame,"resolveChoice",resolveChoice) + + ## patching toDF without arguments should just return the dataframe + __df = DataFrame.to_df + def updated_to_DF(self,*names): + if len(names) == 0: + return self + else: + return __df(self,*names) + setattr(DynamicFrame,"to_df",updated_to_DF) + setattr(DynamicFrame,"toDF",updated_to_DF) + + def rename_field(self, old_name, new_data, transformation_ctx="",info="",ignore_case=True,**kwargs): + if len(kwargs): + logging.warning(f"ignored kwargs: {kwargs}") + if transformation_ctx: + logging.info(f"CTX: {transformation_ctx}") + self.session.append_query_tag(transformation_ctx,separator="|") + if info: + logging.info(info) + logging.info(f"Renaming field {old_name} to {new_name}") + if ignore_case: + field = find_field(old_name,frame,ignore_case=ignore_case) + return frame.withColumnRenamed(field.name, new_name) + else: + return frame.withColumnRenamed(old_name, new_name) diff --git a/extras/glue_helper/sfjobs/sfjobs/context.py b/extras/glue_helper/sfjobs/sfjobs/context.py new file mode 100644 index 0000000..fbcb3ef --- /dev/null +++ b/extras/glue_helper/sfjobs/sfjobs/context.py @@ -0,0 +1,93 @@ +from snowflake.snowpark import Session, DataFrame +import logging +from io import StringIO +from snowflake.connector.util_text import split_statements + +from snowflake.snowpark._internal.utils import quote_name +from .utils import needs_quoting, RawSqlExpression + +class SFContext(): + def __init__(self, session:Session=None): + self.session = session or Session.builder.getOrCreate() + self.logger = logging.getLogger("context") + self.create_dynamic_frame = SFFrameReader(self) + self.write_dynamic_frame = SFFrameWriter(self) + + def create_frame(self,database , table_name ,table_schema="public", transformation_ctx = ""): + if transformation_ctx: + self.logger.info(f"CTX:{transformation_ctx}") + self.session.append_query_tag(transformation_ctx,"|") + database = quote_name(database) if needs_quoting(database) else database + table_name = quote_name(table_name) if needs_quoting(table_name) else table_name + self.logger.info(f"Reading frame from {database}.{table_schema}.{table_name}") + return self.session.table([database, table_schema, table_name]) + + def run_actions(self, actions_text, kind, fail_on_error=False): + if actions_text: + with StringIO(actions_text) as f: + for statement in split_statements(f, remove_comments=True): + try: + self.session.sql(statement) + except Exception as e: + self.logger.error(f"Failed to execute {kind}: {statement}") + if fail_on_error: + raise e + + def write_frame(self, frame:DataFrame, catalog_connection:str, connection_options:dict, redshift_tmp_dir:str="", transformation_ctx:str = "", write_mode:str="append"): + if transformation_ctx: + + self.session.append_query_tag(transformation_ctx,"|") + if redshift_tmp_dir: + self.warning(f"Ignoring argument {redshift_tmp_dir}. Please remove") + self.logger.info(f"Writing frame to {catalog_connection}") + preactions = connection_options.get("preactions", "") + self.run_actions(preactions, "preactions") + dbtable = connection_options.get("dbtable") + dbtable = quote_name(dbtable) if needs_quoting(dbtable) else dbtable + database = connection_options.get("database") + database = quote_name(database) if needs_quoting(database) else database + frame.write.mode(write_mode).save_as_table([database, dbtable]) + postactions = connection_options.get("postactions", "") + self.run_actions(postactions, "postactions") + +class SFFrameReader(object): + def __init__(self, context:SFContext): + self._context = context + + def from_catalog(self, database = None, table_name = None, table_schema="public",redshift_tmp_dir = "", transformation_ctx = "", push_down_predicate = "", additional_options = {}, catalog_id = None, **kwargs): + """Creates a DynamicFrame with the specified catalog name space and table name. + """ + if database is None: + raise Exception("Parameter database is missing.") + if table_name is None: + raise Exception("Parameter table_name is missing.") + db = database + return self._context.create_frame(database=database,table_name=table_name,table_schema=table_schema,transformation_ctx=transformation_ctx) + +class SFFrameWriter(object): + def __init__(self, context:SFContext): + self._context = context + def from_options(self, frame:DataFrame, connection_type, connection_options={}, + format="parquet", format_options={}, transformation_ctx=""): + if connection_type == "s3": + if connection_options.get("storage_integration") is None: + raise Exception("Parameter storage_integration is missing.") + storage_integration = connection_options.get("storage_integration") + frame.write.copy_into_location(connection_options["path"], file_format_type=format, storage_integration=RawSqlExpression(storage_integration), + header=True, overwrite=True) + elif connection_type == "snowflake": + frame.write.save_as_table(connection_options["path"]) + else: + raise Exception("Unsupported connection type: %s" % connection_type) + def from_catalog(self, frame, database = None, table_name = None, table_schema="public", redshift_tmp_dir = "", transformation_ctx = "", additional_options = {}, catalog_id = None, **kwargs): + if database is None: + raise Exception("Parameter database is missing.") + if table_name is None: + raise Exception("Parameter table_name is missing.") + db = database + connection_options = { + "database": db, + "dbtable": table_name, + "schema": table_schema + } + return self._context.write_frame(frame,"--", connection_options,transformation_ctx=transformation_ctx) diff --git a/extras/glue_helper/sfjobs/sfjobs/dynamicframe/__init__.py b/extras/glue_helper/sfjobs/sfjobs/dynamicframe/__init__.py new file mode 100644 index 0000000..cce0aab --- /dev/null +++ b/extras/glue_helper/sfjobs/sfjobs/dynamicframe/__init__.py @@ -0,0 +1 @@ +from snowflake.snowpark import DataFrame as DynamicFrame diff --git a/extras/glue_helper/sfjobs/sfjobs/job.py b/extras/glue_helper/sfjobs/sfjobs/job.py new file mode 100644 index 0000000..bb47b26 --- /dev/null +++ b/extras/glue_helper/sfjobs/sfjobs/job.py @@ -0,0 +1,10 @@ +import logging +class Job: + def __init__(self, context): + self._context = context + def init(self, job_name, args={}): + self._job_name = job_name + self._args = args + + def commit(self): + logging.info('Committing job') \ No newline at end of file diff --git a/extras/glue_helper/sfjobs/sfjobs/transforms/__init__.py b/extras/glue_helper/sfjobs/sfjobs/transforms/__init__.py new file mode 100644 index 0000000..5df7a6a --- /dev/null +++ b/extras/glue_helper/sfjobs/sfjobs/transforms/__init__.py @@ -0,0 +1,6 @@ +from .field_transforms import SelectFields, RenameField +from .apply_mapping import ApplyMapping +from .resolve_choice import ResolveChoice +from .drop_nulls import DropNullFields +from .transform import find_field +from snowflake.snowpark import DataFrame diff --git a/extras/glue_helper/sfjobs/sfjobs/transforms/apply_mapping.py b/extras/glue_helper/sfjobs/sfjobs/transforms/apply_mapping.py new file mode 100644 index 0000000..855b43b --- /dev/null +++ b/extras/glue_helper/sfjobs/sfjobs/transforms/apply_mapping.py @@ -0,0 +1,72 @@ + +from snowflake.snowpark import DataFrame +from snowflake.snowpark.functions import sql_expr, lit, col, object_construct +from .transform import SFTransform + +import logging +from snowflake.snowpark._internal.utils import quote_name + + +class ApplyMapping(SFTransform): + def map_type(self, type_name:str): + if type_name == "long": + return "int" + return type_name + def record_nested_mapping(self, source_field:str, source_type:str, target_field:list, target_type:str,ctx:dict): + if len(target_field) == 1: + target_field = target_field[0] + ctx[target_field] = (source_field, source_type, target_field, target_type) + else: + current_field = target_field.pop(0) + if not current_field in ctx: + ctx[current_field] = {} + self.record_nested_mapping(source_field, source_type, target_field, target_type, ctx[current_field]) + def to_object_construct(self,mapping,case_insensitive=True): + if isinstance(mapping, dict): + new_data = [] + for key in mapping: + data = mapping[key] + if isinstance(data, dict): + new_data.append(lit(key)) + new_data.append(self.to_object_contruct(key, data)) + elif isinstance(data, tuple): + source_field, source_type, target_field, target_type = data + if case_insensitive: + target_field = target_field.upper() + new_data.append(lit(target_field)) + if case_insensitive: + source_field = quote_name(source_field.upper()) + target_type = self.map_type(target_type) + new_data.append(sql_expr(f'{source_field}::{target_type}')) + return object_construct(*new_data) + def __call__(cls, frame:DataFrame, mappings, transformation_ctx:str="", case_insensitive=True): + if transformation_ctx: + logging.info(f"CTX: {transformation_ctx}") + column_mappings = [] + column_names = [] + + nested_mappings = {} + final_columns = [] + for source_field, source_type, target_field, target_type in mappings: + if case_insensitive: + target_field = target_field.upper() + if '.' in target_field: + # nesting + target_parts = target_field.split('.') + cls.record_nested_mapping(source_field, source_type, target_field.split('.'), target_type, nested_mappings) + if target_parts[0] not in final_columns: + final_columns.append(target_parts[0]) + else: + if case_insensitive: + target_field = target_field.upper() + column_names.append(target_field) + if case_insensitive: + source_field = quote_name(source_field.upper()) + target_type = cls.map_type(target_type) + column_mappings.append(sql_expr(f'{source_field}::{target_type}')) + final_columns.append(target_field) + for new_struct_key in nested_mappings: + column_names.append(new_struct_key) + column_mappings.append(cls.to_object_construct(nested_mappings[new_struct_key],case_insensitive)) + + return frame.with_columns(column_names, column_mappings).select(final_columns) \ No newline at end of file diff --git a/extras/glue_helper/sfjobs/sfjobs/transforms/drop_nulls.py b/extras/glue_helper/sfjobs/sfjobs/transforms/drop_nulls.py new file mode 100644 index 0000000..2490265 --- /dev/null +++ b/extras/glue_helper/sfjobs/sfjobs/transforms/drop_nulls.py @@ -0,0 +1,10 @@ +from .transform import SFTransform +import logging +from snowflake.snowpark import DataFrame + +class DropNullFields(SFTransform): + + def __call__(self, frame:DataFrame, transformation_ctx:str = "", info:str = ""): + if transformation_ctx: + logging.info(f"CTX: {transformation_ctx}") + return frame.dropna() diff --git a/extras/glue_helper/sfjobs/sfjobs/transforms/field_transforms.py b/extras/glue_helper/sfjobs/sfjobs/transforms/field_transforms.py new file mode 100644 index 0000000..2fa489f --- /dev/null +++ b/extras/glue_helper/sfjobs/sfjobs/transforms/field_transforms.py @@ -0,0 +1,37 @@ +from snowflake.snowpark import Session, DataFrame +import logging +from .transform import SFTransform, find_field +from functools import reduce +from snowflake.snowpark.functions import col + +class SelectFields(SFTransform): + """ + Get fields within a DataFrame + + :param frame: DataFrame + :param paths: List of Strings or Columns + :param info: String, any string to be associated with errors in this transformation. + :return: DataFrame + """ + def __call__(self, frame, paths, transformation_ctx = "", info = ""): + if transformation_ctx: + logging.info(f"CTX: {transformation_ctx}") + frame.session.append_query_tag(transformation_ctx,separator="|") + if info: + logging.info(info) + logging.info(f"Selecting fields {paths}") + return frame.select(*paths) + +class RenameField(SFTransform): + """ + Rename fields within a DataFrame + :return: DataFrame + """ + def __call__(self, frame, old_name, new_name, transformation_ctx = "", info = "",ignore_case=True, **kwargs): + return frame.rename_field(old_name, new_name, transformation_ctx, info, ignore_case,**kwargs) + +class Join(SFTransform): + def __call__(self, frame1, frame2, keys1, keys2, transformation_ctx = ""): + assert len(keys1) == len(keys2), "The keys lists must be of the same length" + comparison_expression = reduce(lambda expr, ids: expr & (col(ids[0]) == col(ids[1])), zip(list1, list2), col(list1[0]) == col(list2[0])) + return frame1.join(frame2, on=comparison_expression) \ No newline at end of file diff --git a/extras/glue_helper/sfjobs/sfjobs/transforms/resolve_choice.py b/extras/glue_helper/sfjobs/sfjobs/transforms/resolve_choice.py new file mode 100644 index 0000000..a306305 --- /dev/null +++ b/extras/glue_helper/sfjobs/sfjobs/transforms/resolve_choice.py @@ -0,0 +1,65 @@ +import logging +from .transform import SFTransform, find_field +from snowflake.snowpark import DataFrame as DynamicFrame +from snowflake.snowpark._internal.analyzer.analyzer_utils import quote_name_without_upper_casing +from snowflake.snowpark.functions import lit, try_cast, split, iff, typeof, col, object_construct, cast, coalesce, builtin +from snowflake.snowpark._internal.error_message import SnowparkClientExceptionMessages +from snowflake.snowpark._internal.type_utils import snow_type_to_dtype_str +import logging +from snowflake.snowpark._internal.utils import quote_name +from snowflake.snowpark._internal.analyzer.analyzer_utils import unquote_if_quoted + + +from sfjobs.transforms.apply_mapping import ApplyMapping +from snowflake.snowpark.types import StructType, StructField, StringType, ArrayType, IntegerType, FloatType, BooleanType, DateType, TimestampType, VariantType, BinaryType +try_parse_json = builtin("try_parse_json") + +class ResolveChoice(SFTransform): + def __call__(self, frame, specs=None, choice="", database=None, table_name=None, transformation_ctx="", info="",ignore_case=True, **kwargs): + df = frame + if transformation_ctx: + logging.info(f"CTX: {transformation_ctx}") + if len(kwargs): + logging.warning(f"ignored KWARGS: {kwargs}") + for col_name, operation in specs: + if operation.startswith('cast:'): + # Cast the column to the specified type + dtype = operation.split(':')[1] + current_field = find_field(col_name, df, ignore_case=ignore_case) + col_name = current_field.name + current_type = snow_type_to_dtype_str(current_field.datatype) + if current_type != dtype: + df = df.withColumn(col_name, try_cast(cast(col(col_name),"string"), dtype)) + elif operation.startswith('project:'): + # Project the column with the specified type + dtype = operation.split(':')[1].upper() + valid_dtypes = ['NULL_VALUE', 'NULL', 'BOOLEAN', 'STRING', 'INTEGER', 'DOUBLE', 'VARCHAR', 'ARRAY','OBJECT'] + if dtype == 'LONG': + dtype = 'INTEGER' + current_field = find_field(col_name, df, ignore_case=ignore_case) + col_name = current_field.name + col_expr = coalesce(try_parse_json(df[col_name]),df[col_name].cast("variant")) + if not dtype in valid_dtypes: + raise ValueError(f"Invalid data type: {dtype}. Valid values are {valid_dtypes}") + df = df.withColumn(col_name, iff(typeof(col_expr) == lit(dtype), cast(cast(df[col_name],"string"), dtype),lit(None))) + elif operation == 'make_cols': + current_field = find_field(col_name, df, ignore_case=ignore_case) + col_name = current_field.name + col_expr = coalesce(try_parse_json(df[col_name]),df[col_name].cast("variant")) + current_types = [x[0] for x in df.select(typeof(col_expr)).distinct().collect()] + for t in current_types: + df = df.withColumn(f"{unquote_if_quoted(col_name)}_{t}", iff(typeof(col_expr) == lit(t), df[col_name].cast(current_field.datatype), lit(None))) + df = df.drop(col_name) + elif operation == 'make_struct': + struct_elements = [] + current_field = find_field(col_name, df, ignore_case=ignore_case) + col_expr = coalesce(try_parse_json(df[col_name]),df[col_name].cast("variant")) + current_types = [x[0] for x in df.select(typeof(col_expr)).distinct().collect()] + for t in current_types: + struct_elements.append(lit(t)) + struct_elements.append(iff(typeof(col_expr) == lit(t), df[col_name], lit(None))) + df = df.withColumn(col_name, object_construct(*struct_elements)) + else: + raise ValueError(f"Unsupported operation: {operation}") + return df + diff --git a/extras/glue_helper/sfjobs/sfjobs/transforms/transform.py b/extras/glue_helper/sfjobs/sfjobs/transforms/transform.py new file mode 100644 index 0000000..11a3d99 --- /dev/null +++ b/extras/glue_helper/sfjobs/sfjobs/transforms/transform.py @@ -0,0 +1,33 @@ +class SFTransform(object): + """Base class for all Transforms. + + All transformations should inherit from SFTransform and define a + __call__ method. They can optionally override the name classmethod or use + the default of the class name. + """ + + @classmethod + def apply(cls, *args, **kwargs): + transform = cls() + return transform(*args, **kwargs) + + @classmethod + def name(cls): + return cls.__name__ + +from snowflake.snowpark._internal.analyzer.analyzer_utils import quote_name_without_upper_casing +from snowflake.snowpark.types import StringType +def find_field(col_name,df,ignore_case=False): + current_field = None + for f in df.schema.fields: + if ignore_case and f.name.upper() == quote_name_without_upper_casing(col_name).upper(): + current_field = f + break + elif f.name == col_name: + current_field = f + break + if not current_field: + raise Exception(f"Field {col_name} not found") + if isinstance(current_field.datatype, StringType): + current_field.datatype.length = None + return current_field \ No newline at end of file diff --git a/extras/glue_helper/sfjobs/sfjobs/utils.py b/extras/glue_helper/sfjobs/sfjobs/utils.py new file mode 100644 index 0000000..ab75b5d --- /dev/null +++ b/extras/glue_helper/sfjobs/sfjobs/utils.py @@ -0,0 +1,62 @@ +import argparse +import json +import traceback +import sys +import re + + +_global_args = {} + + + +class RawSqlExpression: + def __init__(self, expression): + self.expression = expression + + def __str__(self): + return self.expression + +def needs_quoting(string: str) -> bool: + """Determines if the string needs quoting based on specific rules. + + Returns True if the string needs quoting, otherwise False. + """ + remove_quote = re.compile(r'^"(([_A-Z]+[_A-Z0-9$]*)|(\$\d+))"$') + result = remove_quote.search(string) + return not result + +class ArgumentParser(argparse.ArgumentParser): + def error(self, message): + raise Exception("Argument Error: " + str(message)) + +def argument_exists(parser, arg_name): + for action in parser._actions: + if arg_name in action.option_strings: + return True + return False + +def getResolvedOptions(args, options): + parser = ArgumentParser() + + parser.add_argument('--job-bookmark-option', choices = ['job-bookmark-enable', 'job-bookmark-pause', 'job-bookmark-disable'], required = False) + parser.add_argument('--continuation-option', choices = ['continuation-enabled', 'continuation-readonly', 'continuation-ignore'], required = False) + + + parser.add_argument('--JOB_NAME', required=False) + parser.add_argument('--JOB_ID', required=False) + parser.add_argument('--JOB_RUN_ID', required=False) + + + + parser.add_argument('--TempDir', required=False) + options = [opt for opt in options if opt not in {'TempDir','JOB_NAME'}] + for option in options: + parser.add_argument('--' + option, required=True) + + parsed, extra = parser.parse_known_args(args[1:]) + + parsed_dict = vars(parsed) + + _global_args.update(parsed_dict) + + return parsed_dict diff --git a/extras/glue_helper/snowflake.yml b/extras/glue_helper/snowflake.yml new file mode 100644 index 0000000..64694fe --- /dev/null +++ b/extras/glue_helper/snowflake.yml @@ -0,0 +1,5 @@ +definition_version: 1 +snowpark: + project_name: "sfjobs" + stage_name: "artifacts" + src: "sfjobs/" \ No newline at end of file