diff --git a/CHANGELOG.md b/CHANGELOG.md index 157c2fc1..67bb2ec2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,11 @@ All notable changes to this project will be documented in this file. - `operator-rs` `0.44.0` -> `0.48.0` ([#267], [#275]). - Removed usages of SPARK_DAEMON_JAVA_OPTS since it's not a reliable way to pass extra JVM options ([#272]). - [BREAKING] use product image selection instead of version ([#275]). -- BREAKING refactored application roles to use `CommonConfiguration` structures from the operator framework ([#277]). +- [BREAKING] refactored application roles to use `CommonConfiguration` structures from the operator framework ([#277]). + +### Fixed + +- Dynamic loading of Maven packages ([#281]). [#267]: https://github.com/stackabletech/spark-k8s-operator/pull/267 [#268]: https://github.com/stackabletech/spark-k8s-operator/pull/268 @@ -23,6 +27,7 @@ All notable changes to this project will be documented in this file. [#272]: https://github.com/stackabletech/spark-k8s-operator/pull/272 [#275]: https://github.com/stackabletech/spark-k8s-operator/pull/275 [#277]: https://github.com/stackabletech/spark-k8s-operator/pull/277 +[#281]: https://github.com/stackabletech/spark-k8s-operator/pull/281 ## [23.7.0] - 2023-07-14 diff --git a/docs/modules/spark-k8s/pages/usage-guide/job-dependencies.adoc b/docs/modules/spark-k8s/pages/usage-guide/job-dependencies.adoc index 377b2cf9..b983feeb 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/job-dependencies.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/job-dependencies.adoc @@ -90,25 +90,28 @@ include::example$example-pvc.yaml[] === Spark native package coordinates and Python requirements -IMPORTANT: With the platform release 23.4.1 (and all previous releases), dynamic provisioning of dependencies using the Spark `packages` field doesn't work. This is a known problem with Spark and is tracked https://github.com/stackabletech/spark-k8s-operator/issues/141[here]. - The last and most flexible way to provision dependencies is to use the built-in `spark-submit` support for Maven package coordinates. -These can be specified by adding the following section to the `SparkApplication` manifest file: +The snippet below showcases how to add Apache Iceberg support to a Spark (version 3.4.x) application. [source,yaml] ---- spec: + sparkConf: + spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions + spark.sql.catalog.spark_catalog: org.apache.iceberg.spark.SparkSessionCatalog + spark.sql.catalog.spark_catalog.type: hive + spark.sql.catalog.local: org.apache.iceberg.spark.SparkCatalog + spark.sql.catalog.local.type: hadoop + spark.sql.catalog.local.warehouse: /tmp/warehouse deps: - repositories: - - https://repository.example.com/prod packages: - - com.example:some-package:1.0.0 - excludePackages: - - com.example:other-package + - org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.1 ---- -These directly translate to the spark-submit parameters `--packages`, `--exclude-packages`, and `--repositories`. +IMPORTANT: Currently it's not possible to provision dependencies that are loaded by the JVM's (system class loader)[https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/ClassLoader.html#getSystemClassLoader()]. Such dependencies include JDBC drivers. If you need access to JDBC sources from your Spark application, consider building your own custom Spark image. + +IMPORTANT: Spark version 3.3.x has a https://issues.apache.org/jira/browse/SPARK-35084[known bug] that prevents this mechanism to work. When submitting PySpark jobs, users can specify `pip` requirements that are installed before the driver and executor pods are created. diff --git a/rust/crd/src/constants.rs b/rust/crd/src/constants.rs index 59658051..20ef676d 100644 --- a/rust/crd/src/constants.rs +++ b/rust/crd/src/constants.rs @@ -2,6 +2,9 @@ use stackable_operator::memory::{BinaryMultiple, MemoryQuantity}; pub const APP_NAME: &str = "spark-k8s"; +pub const VOLUME_MOUNT_NAME_IVY2: &str = "ivy2"; +pub const VOLUME_MOUNT_PATH_IVY2: &str = "/ivy2"; + pub const VOLUME_MOUNT_NAME_DRIVER_POD_TEMPLATES: &str = "driver-pod-template"; pub const VOLUME_MOUNT_PATH_DRIVER_POD_TEMPLATES: &str = "/stackable/spark/driver-pod-templates"; diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 7485f7c9..b8dc53b1 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -182,6 +182,14 @@ impl SparkApplication { .map(|req| req.join(" ")) } + pub fn packages(&self) -> Vec { + self.spec + .deps + .as_ref() + .and_then(|deps| deps.packages.clone()) + .unwrap_or_default() + } + pub fn volumes( &self, s3conn: &Option, @@ -242,6 +250,13 @@ impl SparkApplication { .build(), ); + if !self.packages().is_empty() { + result.push( + VolumeBuilder::new(VOLUME_MOUNT_NAME_IVY2) + .empty_dir(EmptyDirVolumeSource::default()) + .build(), + ); + } if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, s3logdir) { result.push( VolumeBuilder::new(STACKABLE_TRUST_STORE_NAME) @@ -332,6 +347,13 @@ impl SparkApplication { ..VolumeMount::default() }); + if !self.packages().is_empty() { + mounts.push(VolumeMount { + name: VOLUME_MOUNT_NAME_IVY2.into(), + mount_path: VOLUME_MOUNT_PATH_IVY2.into(), + ..VolumeMount::default() + }); + } if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, s3logdir) { mounts.push(VolumeMount { name: STACKABLE_TRUST_STORE_NAME.into(), @@ -458,7 +480,10 @@ impl SparkApplication { deps.repositories .map(|r| format!("--repositories {}", r.join(","))), ); - submit_cmd.extend(deps.packages.map(|p| format!("--packages {}", p.join(",")))); + submit_cmd.extend( + deps.packages + .map(|p| format!("--conf spark.jars.packages={}", p.join(","))), + ); } // some command elements need to be initially stored in a map (to allow overwrites) and @@ -498,6 +523,10 @@ impl SparkApplication { submit_conf.extend(log_dir.application_spark_config()); } + if !self.packages().is_empty() { + submit_cmd.push(format!("--conf spark.jars.ivy={VOLUME_MOUNT_PATH_IVY2}")) + } + // conf arguments: these should follow - and thus override - values set from resource limits above if let Some(spark_conf) = self.spec.spark_conf.clone() { submit_conf.extend(spark_conf); diff --git a/tests/templates/kuttl/iceberg/01-assert.yaml.j2 b/tests/templates/kuttl/iceberg/01-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/iceberg/01-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/iceberg/01-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/iceberg/01-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/iceberg/01-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/iceberg/10-assert.yaml.j2 b/tests/templates/kuttl/iceberg/10-assert.yaml.j2 new file mode 100644 index 00000000..6a97da89 --- /dev/null +++ b/tests/templates/kuttl/iceberg/10-assert.yaml.j2 @@ -0,0 +1,17 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +--- +# The Job starting the whole process +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: pyspark-iceberg +status: +{% if test_scenario['values']['spark'].startswith("3.3") %} + # Spark 3.3 is expected to fail because of this https://issues.apache.org/jira/browse/SPARK-35084 + phase: Failed +{% else %} + phase: Succeeded +{% endif %} diff --git a/tests/templates/kuttl/iceberg/10-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/iceberg/10-deploy-spark-app.yaml.j2 new file mode 100644 index 00000000..8dcf96df --- /dev/null +++ b/tests/templates/kuttl/iceberg/10-deploy-spark-app.yaml.j2 @@ -0,0 +1,84 @@ +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: pyspark-iceberg +spec: + version: "1.0" +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + sparkImage: +{% if test_scenario['values']['spark'].find(",") > 0 %} + custom: "{{ test_scenario['values']['spark'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['spark'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['spark'] }}" +{% endif %} + pullPolicy: IfNotPresent + mode: cluster + mainApplicationFile: "local:///stackable/spark/jobs/write-to-iceberg.py" + sparkConf: + spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions + spark.sql.catalog.spark_catalog: org.apache.iceberg.spark.SparkSessionCatalog + spark.sql.catalog.spark_catalog.type: hive + spark.sql.catalog.local: org.apache.iceberg.spark.SparkCatalog + spark.sql.catalog.local.type: hadoop + spark.sql.catalog.local.warehouse: /tmp/warehouse + job: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + driver: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + volumeMounts: + - name: script + mountPath: /stackable/spark/jobs + executor: + replicas: 1 + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + volumeMounts: + - name: script + mountPath: /stackable/spark/jobs + deps: + packages: + - org.apache.iceberg:iceberg-spark-runtime-{{ test_scenario['values']['spark'].rstrip('.0') }}_2.12:1.3.1 + volumes: + - name: script + configMap: + name: write-to-iceberg +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: write-to-iceberg +data: + write-to-iceberg.py: | + from pyspark.sql import SparkSession + from pyspark.sql.types import * + + spark = SparkSession.builder.appName("write-to-iceberg").getOrCreate() + + schema = StructType([ + StructField("id", LongType(), True), + StructField("data", StringType(), True) + ]) + + + # create table + df = spark.createDataFrame([], schema) + df.writeTo("local.db.table").create() + + # append to table + data = [ + (1,"one"), + (2,"two"), + (3,"three"), + (4,"four") + ] + + df = spark.createDataFrame(data, schema) + df.writeTo("local.db.table").append() diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index b8c0fedc..b399ceb0 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -67,6 +67,9 @@ tests: - spark - ny-tlc-report - openshift + - name: iceberg + dimensions: + - spark suites: - name: nightly patch: