Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deploy apps with dynamic dependencies. #281

Merged
merged 38 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
0206f4b
Create ivy2 volume when installing packages.
razvan May 30, 2023
161de94
wip
razvan Sep 1, 2023
23d0a1f
Re-add pod overrides for the submit pod.
razvan Sep 2, 2023
51c1b77
Unify driver and executor configurations.
razvan Sep 4, 2023
44afe45
Merge branch 'main' into refactor/config-overrides
razvan Sep 4, 2023
1894fae
Successful smoke tests.
razvan Sep 7, 2023
babbaa3
Rename config structs.
razvan Sep 7, 2023
70f8726
smoke tests pass on kind.
razvan Sep 7, 2023
4d569c8
Refactor, cleanup and split configuration between driver and executor…
razvan Sep 8, 2023
7a18c40
Merge branch 'main' into refactor/config-overrides
razvan Sep 8, 2023
019b18c
Update docs, examples and changelog.
razvan Sep 8, 2023
d57a6de
merge feat/config-overrides
razvan Sep 11, 2023
af28ea3
fix typo
razvan Sep 11, 2023
e24085d
fix typo
razvan Sep 11, 2023
3f6f900
Move replicas under spec.executor.config in tests and examples.
razvan Sep 11, 2023
d08d37c
merge feat/config-overrides
razvan Sep 11, 2023
a9f67f8
use spark.jars.packages instead of --packages
razvan Sep 11, 2023
6667d02
Added test with pyspark and iceberg dependency.
razvan Sep 11, 2023
947e649
Merge branch 'main' into refactor/config-overrides
razvan Sep 11, 2023
04a4b62
Merge branch 'refactor/config-overrides' into feat/extensions-with-co…
razvan Sep 11, 2023
fddc8dd
update docs
razvan Sep 12, 2023
b2439df
Clean up tests.
razvan Sep 14, 2023
d4e64ba
Merge branch 'main' into refactor/config-overrides
razvan Sep 14, 2023
36a41c0
Clean up tests.
razvan Sep 14, 2023
facfa0a
Apply suggestions
razvan Sep 14, 2023
6b3f78b
Remove old node selector struct
razvan Sep 14, 2023
1831437
Added module doc for roles.rs
razvan Sep 14, 2023
3bf1745
Merge branch 'refactor/config-overrides' into feat/extensions-with-co…
razvan Sep 14, 2023
ed452e2
Use RoleGroup for executors to make replicas on the same level as exe…
razvan Sep 20, 2023
96edda9
Update tests with "replicas" directly under "executor".
razvan Sep 21, 2023
90c6e8a
Update docs/examples with "replicas" directly under "executor".
razvan Sep 21, 2023
ab9cecc
Merge branch 'main' into refactor/config-overrides
razvan Sep 21, 2023
9cf4441
Update rust/crd/src/roles.rs
razvan Sep 25, 2023
1ad997f
Implement review feedback.
razvan Sep 25, 2023
2a89755
Merge branch 'refactor/config-overrides' into feat/extensions-with-co…
razvan Sep 25, 2023
748f88c
Merge branch 'main' into feat/extensions-with-config-overrides
razvan Sep 25, 2023
0e803b7
Update CHANGELOG.md
razvan Sep 25, 2023
634af7d
Fix test for 3.3 and update docs.
razvan Sep 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@ All notable changes to this project will be documented in this file.
- `operator-rs` `0.44.0` -> `0.48.0` ([#267], [#275]).
- Removed usages of SPARK_DAEMON_JAVA_OPTS since it's not a reliable way to pass extra JVM options ([#272]).
- [BREAKING] use product image selection instead of version ([#275]).
- BREAKING refactored application roles to use `CommonConfiguration` structures from the operator framework ([#277]).
- [BREAKING] refactored application roles to use `CommonConfiguration` structures from the operator framework ([#277]).

### Fixed

- Dynamic loading of Maven packages ([#281]).

[#267]: https://github.com/stackabletech/spark-k8s-operator/pull/267
[#268]: https://github.com/stackabletech/spark-k8s-operator/pull/268
[#269]: https://github.com/stackabletech/spark-k8s-operator/pull/269
[#272]: https://github.com/stackabletech/spark-k8s-operator/pull/272
[#275]: https://github.com/stackabletech/spark-k8s-operator/pull/275
[#277]: https://github.com/stackabletech/spark-k8s-operator/pull/277
[#281]: https://github.com/stackabletech/spark-k8s-operator/pull/281

## [23.7.0] - 2023-07-14

Expand Down
21 changes: 12 additions & 9 deletions docs/modules/spark-k8s/pages/usage-guide/job-dependencies.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,25 +90,28 @@ include::example$example-pvc.yaml[]

=== Spark native package coordinates and Python requirements

IMPORTANT: With the platform release 23.4.1 (and all previous releases), dynamic provisioning of dependencies using the Spark `packages` field doesn't work. This is a known problem with Spark and is tracked https://github.com/stackabletech/spark-k8s-operator/issues/141[here].

The last and most flexible way to provision dependencies is to use the built-in `spark-submit` support for Maven package coordinates.

These can be specified by adding the following section to the `SparkApplication` manifest file:
The snippet below showcases how to add Apache Iceberg support to a Spark (version 3.4.x) application.

[source,yaml]
----
spec:
sparkConf:
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.spark_catalog: org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type: hive
spark.sql.catalog.local: org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.local.type: hadoop
spark.sql.catalog.local.warehouse: /tmp/warehouse
deps:
repositories:
- https://repository.example.com/prod
packages:
- com.example:some-package:1.0.0
excludePackages:
- com.example:other-package
- org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.1
----

These directly translate to the spark-submit parameters `--packages`, `--exclude-packages`, and `--repositories`.
IMPORTANT: Currently it's not possible to provision dependencies that are loaded by the JVM's (system class loader)[https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/ClassLoader.html#getSystemClassLoader()]. Such dependencies include JDBC drivers. If you need access to JDBC sources from your Spark application, consider building your own custom Spark image.

IMPORTANT: Spark version 3.3.x has a https://issues.apache.org/jira/browse/SPARK-35084[known bug] that prevents this mechanism to work.

When submitting PySpark jobs, users can specify `pip` requirements that are installed before the driver and executor pods are created.

Expand Down
3 changes: 3 additions & 0 deletions rust/crd/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ use stackable_operator::memory::{BinaryMultiple, MemoryQuantity};

pub const APP_NAME: &str = "spark-k8s";

pub const VOLUME_MOUNT_NAME_IVY2: &str = "ivy2";
pub const VOLUME_MOUNT_PATH_IVY2: &str = "/ivy2";

pub const VOLUME_MOUNT_NAME_DRIVER_POD_TEMPLATES: &str = "driver-pod-template";
pub const VOLUME_MOUNT_PATH_DRIVER_POD_TEMPLATES: &str = "/stackable/spark/driver-pod-templates";

Expand Down
31 changes: 30 additions & 1 deletion rust/crd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ impl SparkApplication {
.map(|req| req.join(" "))
}

pub fn packages(&self) -> Vec<String> {
self.spec
.deps
.as_ref()
.and_then(|deps| deps.packages.clone())
.unwrap_or_default()
}

pub fn volumes(
&self,
s3conn: &Option<S3ConnectionSpec>,
Expand Down Expand Up @@ -242,6 +250,13 @@ impl SparkApplication {
.build(),
);

if !self.packages().is_empty() {
result.push(
VolumeBuilder::new(VOLUME_MOUNT_NAME_IVY2)
.empty_dir(EmptyDirVolumeSource::default())
.build(),
);
}
if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, s3logdir) {
result.push(
VolumeBuilder::new(STACKABLE_TRUST_STORE_NAME)
Expand Down Expand Up @@ -332,6 +347,13 @@ impl SparkApplication {
..VolumeMount::default()
});

if !self.packages().is_empty() {
mounts.push(VolumeMount {
name: VOLUME_MOUNT_NAME_IVY2.into(),
mount_path: VOLUME_MOUNT_PATH_IVY2.into(),
..VolumeMount::default()
});
}
if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, s3logdir) {
mounts.push(VolumeMount {
name: STACKABLE_TRUST_STORE_NAME.into(),
Expand Down Expand Up @@ -458,7 +480,10 @@ impl SparkApplication {
deps.repositories
.map(|r| format!("--repositories {}", r.join(","))),
);
submit_cmd.extend(deps.packages.map(|p| format!("--packages {}", p.join(","))));
submit_cmd.extend(
deps.packages
.map(|p| format!("--conf spark.jars.packages={}", p.join(","))),
);
}

// some command elements need to be initially stored in a map (to allow overwrites) and
Expand Down Expand Up @@ -498,6 +523,10 @@ impl SparkApplication {
submit_conf.extend(log_dir.application_spark_config());
}

if !self.packages().is_empty() {
submit_cmd.push(format!("--conf spark.jars.ivy={VOLUME_MOUNT_PATH_IVY2}"))
}

// conf arguments: these should follow - and thus override - values set from resource limits above
if let Some(spark_conf) = self.spec.spark_conf.clone() {
submit_conf.extend(spark_conf);
Expand Down
10 changes: 10 additions & 0 deletions tests/templates/kuttl/iceberg/01-assert.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: vector-aggregator-discovery
{% endif %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: vector-aggregator-discovery
data:
ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }}
{% endif %}
17 changes: 17 additions & 0 deletions tests/templates/kuttl/iceberg/10-assert.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 300
---
# The Job starting the whole process
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: pyspark-iceberg
status:
{% if test_scenario['values']['spark'].startswith("3.3") %}
# Spark 3.3 is expected to fail because of this https://issues.apache.org/jira/browse/SPARK-35084
phase: Failed
{% else %}
phase: Succeeded
{% endif %}
84 changes: 84 additions & 0 deletions tests/templates/kuttl/iceberg/10-deploy-spark-app.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
---
sbernauer marked this conversation as resolved.
Show resolved Hide resolved
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: pyspark-iceberg
spec:
version: "1.0"
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
vectorAggregatorConfigMapName: vector-aggregator-discovery
{% endif %}
sparkImage:
{% if test_scenario['values']['spark'].find(",") > 0 %}
custom: "{{ test_scenario['values']['spark'].split(',')[1] }}"
productVersion: "{{ test_scenario['values']['spark'].split(',')[0] }}"
{% else %}
productVersion: "{{ test_scenario['values']['spark'] }}"
{% endif %}
pullPolicy: IfNotPresent
mode: cluster
mainApplicationFile: "local:///stackable/spark/jobs/write-to-iceberg.py"
sparkConf:
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.spark_catalog: org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type: hive
spark.sql.catalog.local: org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.local.type: hadoop
spark.sql.catalog.local.warehouse: /tmp/warehouse
job:
logging:
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
driver:
config:
logging:
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
volumeMounts:
- name: script
mountPath: /stackable/spark/jobs
executor:
replicas: 1
config:
logging:
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
volumeMounts:
- name: script
mountPath: /stackable/spark/jobs
deps:
packages:
- org.apache.iceberg:iceberg-spark-runtime-{{ test_scenario['values']['spark'].rstrip('.0') }}_2.12:1.3.1
volumes:
- name: script
configMap:
name: write-to-iceberg
---
apiVersion: v1
kind: ConfigMap
metadata:
name: write-to-iceberg
data:
write-to-iceberg.py: |
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("write-to-iceberg").getOrCreate()

schema = StructType([
StructField("id", LongType(), True),
StructField("data", StringType(), True)
])


# create table
df = spark.createDataFrame([], schema)
df.writeTo("local.db.table").create()

# append to table
data = [
(1,"one"),
(2,"two"),
(3,"three"),
(4,"four")
]

df = spark.createDataFrame(data, schema)
df.writeTo("local.db.table").append()
3 changes: 3 additions & 0 deletions tests/test-definition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ tests:
- spark
- ny-tlc-report
- openshift
- name: iceberg
dimensions:
- spark
suites:
- name: nightly
patch:
Expand Down