Skip to content

Commit

Permalink
refactor(kafka): delegate the JSON schema generation to the project
Browse files Browse the repository at this point in the history
The automatic schema generation heuristic was nice for simple cases but
has several drawbacks::

  - it would not work as soon as devs are a bit more creative in their
    code (uses parametrized...).
    It's difficult to anticipate and we should never underestimate devs
    creativity.

  - if it doesn't work, devs are not really autonomous to debug in case
    it doesn't work as expected in their project since the code runs in
    the CI and they are less likely to look at the mix of shell script
    and templated scala code used in the GitHub action.

For all these reasons, the code generation for scala now only calls a
standardized sbt target that is expected to be implemented in projects.
It will be implemented out of the box in template projects and devs are
free to change it when they use a different implementation.
  • Loading branch information
yannrouillard committed Mar 6, 2024
1 parent c646b44 commit eb670cc
Showing 1 changed file with 81 additions and 99 deletions.
180 changes: 81 additions & 99 deletions kafka/check-local-schemas.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@ set -o errexit # Leave immediately if a command returns an error
set -o nounset # Leave immediately if an unitialized value is used
set -o pipefail # Leave immediately if a command fails in a pipe

shopt -s nullglob

[[ "${BASH_VERSION}" =~ ^(5|4\.[0-9]).* ]] && shopt -s inherit_errexit

SCRIPT_DIR="$(cd -P "$(dirname "${BASH_SOURCE[0]}")" && pwd)"

#####################################################################
# Helper functions
#####################################################################

error() {
fatal() {
local msg="$1" exit_code="${2:-1}"
echo "ERROR: ${msg}">&2
echo "FATAL: ${msg}">&2
exit "${exit_code}"
}

error() {
local msg="$1"
echo "ERROR: ${msg}">&2
}

check_binary_exists() {
local binary="$1"
command -v "${binary}" &>/dev/null || error "${binary} is required but it's not installed"
Expand All @@ -36,99 +42,65 @@ get_repository_url() {
git remote get-url origin
}

get_md5sum() {
local file="$1"
md5sum "${file}" | awk '{ print $1}'
md5sum_files() {
[[ -z "$*" ]] || md5sum "$@"
}

find_schema_class_file() {
# The schema class heuristic is a bit hacky for now, we try to find a file
# where a class has been annotated with the schema annotation
# Otherwise we fallback on finding the filename containing the schema code
# to end with Schema or is named InputModel
# We might want to improve this in the future
schema_class_file="$(grep -lr "^@schema" src | head -n 1 || return 0)"

if [[ -z "${schema_class_file}" ]]; then
schema_class_file="$(find src -name "*Schema.scala" -o -name "InputModel.scala" | head -n 1 || return 0)"
fi

echo "${schema_class_file}"

get_md5sum() {
local file="$1" checksums="$2"
awk -v file="${file}" '$2 == file { print $1 }' <<< "${checksums}"
}

find_schema_class() {
local schema_class_file="$1"
schema_class_name="$(basename "${schema_class_file}" .scala)"
schema_package="$(awk ' $1 == "package" { print $2 }' "${schema_class_file}")"

echo "${schema_package}.${schema_class_name}"
detect_current_project_language() {
if [[ -n "${PROJECT_LANGUAGE:-}" ]]; then
echo "${PROJECT_LANGUAGE}"
elif [[ -f "build.sbt" ]]; then
echo "scala"
else
echo "unknown"
fi
}

is_library_used() {
local library="$1" candidate_class_file="$2"
fix_end_of_file() {
local file="$1"
[[ $(tail -c1 "${file}") == "" ]] || echo >> "${file}"
}

# if the library is not directly found in the candidate class file
# we fallback on checking the build.sbt file itself
# This doesn't fully protect against from indirect library loading
# but it's a good enough heuristic for now
for candidate in "${candidate_class_file}" build.sbt; do
if grep -q -E "[^#]*${library}" "${candidate}"; then
return 0
fi
fix_kafka_schemas_end_of_file() {
for schema_file in $(find_schema_files); do
fix_end_of_file "${schema_file}"
done
return 1
}

find_avro_library() {
local schema_class_file="$1"

if is_library_used "com.sksamuel.avro4s" "${schema_class_file}"; then
echo "avro4s"
elif is_library_used "vulcan" "${schema_class_file}"; then
echo "vulcan"
else
error "Could not find any avro library import in ${schema_class_file}"
fi

find_schema_files() {
find schemas -type f -name '*.avsc' | sort
}

generate_schema_generator_code() {
local schema_class="$1" schema_library="$2"

schema_class_name="${schema_class##*.}"
schema_package="${schema_class%.*}"

# only schema class using vulcan are supported for now
# but we might add support for avro4s in the future
sed \
-e "s/__SCHEMA_CLASS_NAME__/${schema_class_name}/g" \
-e "s/__SCHEMA_PACKAGE__/${schema_package}/g" \
"${SCRIPT_DIR}/generators/${schema_library^}SchemaGenerator.tmpl.scala"
find_obsolete_schema_files() {
local date="$1"
find schemas -type f -name '*.avsc' -not -newermt "${date}"
}

run_schema_generator_code() {
local generator_code_file="$1" target_schema_file="$2"

generator_source_folder="$(dirname "${generator_code_file}")"

sbt_command=""
# When fork is enabled, it seems we can't avoid all sbt logs to be printed
# so we just disable it
sbt_command+="set fork := false;"
# We tell sbt to look for our generator code in the temporary folder in addition
# to the existing source code, so we can run our generator code alongside the existing code
# We need that as the generator code import the schema class
sbt_command+="set Compile / unmanagedSourceDirectories += file(\"${generator_source_folder}\");"
# Dynamically add the required dependencies to the build.sbt file
sbt_command+="set libraryDependencies += \"com.lihaoyi\" %% \"upickle\" % \"3.1.3\";"
sbt_command+="set libraryDependencies += \"com.lihaoyi\" %% \"os-lib\" % \"0.9.1\";"

sbt_command+="runMain kp_pre_commit_hooks.generateSchemaFile ${target_schema_file}"
generate_kafka_schemas_for_scala() {
if ! sbt "tasks -V" | grep -qE "^ *generateKafkaSchemas "; then
error "The project does not have a sbt generateKafkaSchemas task"
fi
sbt -batch -error "set fork := false; generateKafkaSchemas"
}

sbt -batch -error "${sbt_command}"
# Add a last linefeed to make pre-commit end-of-line fixer happy
echo >> "${target_schema_file}"
run_schema_generation_task() {
local language="$1"
case "${language}" in
scala)
check_binary_exists "sbt"
generate_kafka_schemas_for_scala
fix_kafka_schemas_end_of_file
;;
*)
error "Unsupported language: ${language}"
;;
esac
}

#####################################################################
Expand All @@ -137,32 +109,42 @@ run_schema_generator_code() {

trap clean_temporary_folder EXIT

# We don't want to run on template repositories
[[ "$(get_repository_url)" != "[email protected]:Kpler/template-"* ]] || exit 0
language="$(detect_current_project_language)"

before_schema_generation="$(date --date='-1 second' +'%Y-%m-%d %H:%M:%S')"

check_binary_exists "sbt"
# shellcheck disable=SC2046
schema_md5sum_before="$(md5sum_files $(find_schema_files))"

target_schema_file="schemas/schema.avsc"
run_schema_generation_task "${language}"

generator_source_folder="$(mktemp -d)"
generator_code_file="${generator_source_folder}/SchemaGenerator.scala"
schema_files_generated=$(find_schema_files)
[[ -n "${schema_files_generated}" ]] || fatal "No schema files found were generated"

[[ ! -f "${target_schema_file}" ]] || checksum_before="$(get_md5sum "${target_schema_file}")"
# shellcheck disable=SC2086
schema_md5sum_after="$(md5sum_files ${schema_files_generated})"

schema_class_file="$(find_schema_class_file)"
[[ -n "${schema_class_file}" ]] || error "Could not find any schema class file"
error_found="false"

schema_class="$(find_schema_class "${schema_class_file}")"
schema_library="$(find_avro_library "${schema_class_file}")"
for schema_file in ${schema_files_generated}; do
if ! is_git_tracked "${schema_file}"; then
error "Schema file \"${schema_file}\" is not tracked by git. Please commit it."
error_found="true"
fi

generate_schema_generator_code "${schema_class}" "${schema_library}" > "${generator_code_file}"
run_schema_generator_code "${generator_code_file}" "${target_schema_file}"
checksum_after="$(get_md5sum "${schema_file}" "${schema_md5sum_after}")"
checksum_before="$(get_md5sum "${schema_file}" "${schema_md5sum_before}")"
if [[ "${checksum_after}" != "${checksum_before}" ]]; then
error "Schema file \"${schema_file}\" is not consistent with code. Please commit the updated version."
error_found="true"
fi
done

if ! is_git_tracked "${target_schema_file}"; then
error "Schema file \"${target_schema_file}\" is not tracked by git. Please commit it."
obsolete_schemas_files=$(find_obsolete_schema_files "${before_schema_generation}")
if [[ -n "${obsolete_schemas_files}" ]]; then
error "The following schema files seem obsolete: ${obsolete_schemas_files}. Please delete them."
error_found="true"
fi

checksum_after="$(get_md5sum "${target_schema_file}")"
if [[ "${checksum_after}" != "${checksum_before:-}" ]]; then
error "Schema file \"${target_schema_file}\" was missing or not consistent with code. Please commit the updated version."
fi
[[ "${error_found}" == "false" ]] || exit 1

0 comments on commit eb670cc

Please sign in to comment.