Skip to content

Commit

Permalink
Merge pull request #33 from Kpler/feat/PTFM-9207/delegate-kafka-schem…
Browse files Browse the repository at this point in the history
…a-generation-to-project

refactor(kafka): delegate the JSON schema generation to the project
  • Loading branch information
yannrouillard authored Mar 13, 2024
2 parents c646b44 + eb670cc commit 7659672
Showing 1 changed file with 81 additions and 99 deletions.
180 changes: 81 additions & 99 deletions kafka/check-local-schemas.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@ set -o errexit # Leave immediately if a command returns an error
set -o nounset # Leave immediately if an unitialized value is used
set -o pipefail # Leave immediately if a command fails in a pipe

shopt -s nullglob

[[ "${BASH_VERSION}" =~ ^(5|4\.[0-9]).* ]] && shopt -s inherit_errexit

SCRIPT_DIR="$(cd -P "$(dirname "${BASH_SOURCE[0]}")" && pwd)"

#####################################################################
# Helper functions
#####################################################################

error() {
fatal() {
local msg="$1" exit_code="${2:-1}"
echo "ERROR: ${msg}">&2
echo "FATAL: ${msg}">&2
exit "${exit_code}"
}

error() {
local msg="$1"
echo "ERROR: ${msg}">&2
}

check_binary_exists() {
local binary="$1"
command -v "${binary}" &>/dev/null || error "${binary} is required but it's not installed"
Expand All @@ -36,99 +42,65 @@ get_repository_url() {
git remote get-url origin
}

get_md5sum() {
local file="$1"
md5sum "${file}" | awk '{ print $1}'
md5sum_files() {
[[ -z "$*" ]] || md5sum "$@"
}

find_schema_class_file() {
# The schema class heuristic is a bit hacky for now, we try to find a file
# where a class has been annotated with the schema annotation
# Otherwise we fallback on finding the filename containing the schema code
# to end with Schema or is named InputModel
# We might want to improve this in the future
schema_class_file="$(grep -lr "^@schema" src | head -n 1 || return 0)"

if [[ -z "${schema_class_file}" ]]; then
schema_class_file="$(find src -name "*Schema.scala" -o -name "InputModel.scala" | head -n 1 || return 0)"
fi

echo "${schema_class_file}"

get_md5sum() {
local file="$1" checksums="$2"
awk -v file="${file}" '$2 == file { print $1 }' <<< "${checksums}"
}

find_schema_class() {
local schema_class_file="$1"
schema_class_name="$(basename "${schema_class_file}" .scala)"
schema_package="$(awk ' $1 == "package" { print $2 }' "${schema_class_file}")"

echo "${schema_package}.${schema_class_name}"
detect_current_project_language() {
if [[ -n "${PROJECT_LANGUAGE:-}" ]]; then
echo "${PROJECT_LANGUAGE}"
elif [[ -f "build.sbt" ]]; then
echo "scala"
else
echo "unknown"
fi
}

is_library_used() {
local library="$1" candidate_class_file="$2"
fix_end_of_file() {
local file="$1"
[[ $(tail -c1 "${file}") == "" ]] || echo >> "${file}"
}

# if the library is not directly found in the candidate class file
# we fallback on checking the build.sbt file itself
# This doesn't fully protect against from indirect library loading
# but it's a good enough heuristic for now
for candidate in "${candidate_class_file}" build.sbt; do
if grep -q -E "[^#]*${library}" "${candidate}"; then
return 0
fi
fix_kafka_schemas_end_of_file() {
for schema_file in $(find_schema_files); do
fix_end_of_file "${schema_file}"
done
return 1
}

find_avro_library() {
local schema_class_file="$1"

if is_library_used "com.sksamuel.avro4s" "${schema_class_file}"; then
echo "avro4s"
elif is_library_used "vulcan" "${schema_class_file}"; then
echo "vulcan"
else
error "Could not find any avro library import in ${schema_class_file}"
fi

find_schema_files() {
find schemas -type f -name '*.avsc' | sort
}

generate_schema_generator_code() {
local schema_class="$1" schema_library="$2"

schema_class_name="${schema_class##*.}"
schema_package="${schema_class%.*}"

# only schema class using vulcan are supported for now
# but we might add support for avro4s in the future
sed \
-e "s/__SCHEMA_CLASS_NAME__/${schema_class_name}/g" \
-e "s/__SCHEMA_PACKAGE__/${schema_package}/g" \
"${SCRIPT_DIR}/generators/${schema_library^}SchemaGenerator.tmpl.scala"
find_obsolete_schema_files() {
local date="$1"
find schemas -type f -name '*.avsc' -not -newermt "${date}"
}

run_schema_generator_code() {
local generator_code_file="$1" target_schema_file="$2"

generator_source_folder="$(dirname "${generator_code_file}")"

sbt_command=""
# When fork is enabled, it seems we can't avoid all sbt logs to be printed
# so we just disable it
sbt_command+="set fork := false;"
# We tell sbt to look for our generator code in the temporary folder in addition
# to the existing source code, so we can run our generator code alongside the existing code
# We need that as the generator code import the schema class
sbt_command+="set Compile / unmanagedSourceDirectories += file(\"${generator_source_folder}\");"
# Dynamically add the required dependencies to the build.sbt file
sbt_command+="set libraryDependencies += \"com.lihaoyi\" %% \"upickle\" % \"3.1.3\";"
sbt_command+="set libraryDependencies += \"com.lihaoyi\" %% \"os-lib\" % \"0.9.1\";"

sbt_command+="runMain kp_pre_commit_hooks.generateSchemaFile ${target_schema_file}"
generate_kafka_schemas_for_scala() {
if ! sbt "tasks -V" | grep -qE "^ *generateKafkaSchemas "; then
error "The project does not have a sbt generateKafkaSchemas task"
fi
sbt -batch -error "set fork := false; generateKafkaSchemas"
}

sbt -batch -error "${sbt_command}"
# Add a last linefeed to make pre-commit end-of-line fixer happy
echo >> "${target_schema_file}"
run_schema_generation_task() {
local language="$1"
case "${language}" in
scala)
check_binary_exists "sbt"
generate_kafka_schemas_for_scala
fix_kafka_schemas_end_of_file
;;
*)
error "Unsupported language: ${language}"
;;
esac
}

#####################################################################
Expand All @@ -137,32 +109,42 @@ run_schema_generator_code() {

trap clean_temporary_folder EXIT

# We don't want to run on template repositories
[[ "$(get_repository_url)" != "[email protected]:Kpler/template-"* ]] || exit 0
language="$(detect_current_project_language)"

before_schema_generation="$(date --date='-1 second' +'%Y-%m-%d %H:%M:%S')"

check_binary_exists "sbt"
# shellcheck disable=SC2046
schema_md5sum_before="$(md5sum_files $(find_schema_files))"

target_schema_file="schemas/schema.avsc"
run_schema_generation_task "${language}"

generator_source_folder="$(mktemp -d)"
generator_code_file="${generator_source_folder}/SchemaGenerator.scala"
schema_files_generated=$(find_schema_files)
[[ -n "${schema_files_generated}" ]] || fatal "No schema files found were generated"

[[ ! -f "${target_schema_file}" ]] || checksum_before="$(get_md5sum "${target_schema_file}")"
# shellcheck disable=SC2086
schema_md5sum_after="$(md5sum_files ${schema_files_generated})"

schema_class_file="$(find_schema_class_file)"
[[ -n "${schema_class_file}" ]] || error "Could not find any schema class file"
error_found="false"

schema_class="$(find_schema_class "${schema_class_file}")"
schema_library="$(find_avro_library "${schema_class_file}")"
for schema_file in ${schema_files_generated}; do
if ! is_git_tracked "${schema_file}"; then
error "Schema file \"${schema_file}\" is not tracked by git. Please commit it."
error_found="true"
fi

generate_schema_generator_code "${schema_class}" "${schema_library}" > "${generator_code_file}"
run_schema_generator_code "${generator_code_file}" "${target_schema_file}"
checksum_after="$(get_md5sum "${schema_file}" "${schema_md5sum_after}")"
checksum_before="$(get_md5sum "${schema_file}" "${schema_md5sum_before}")"
if [[ "${checksum_after}" != "${checksum_before}" ]]; then
error "Schema file \"${schema_file}\" is not consistent with code. Please commit the updated version."
error_found="true"
fi
done

if ! is_git_tracked "${target_schema_file}"; then
error "Schema file \"${target_schema_file}\" is not tracked by git. Please commit it."
obsolete_schemas_files=$(find_obsolete_schema_files "${before_schema_generation}")
if [[ -n "${obsolete_schemas_files}" ]]; then
error "The following schema files seem obsolete: ${obsolete_schemas_files}. Please delete them."
error_found="true"
fi

checksum_after="$(get_md5sum "${target_schema_file}")"
if [[ "${checksum_after}" != "${checksum_before:-}" ]]; then
error "Schema file \"${target_schema_file}\" was missing or not consistent with code. Please commit the updated version."
fi
[[ "${error_found}" == "false" ]] || exit 1

0 comments on commit 7659672

Please sign in to comment.