From 45c1e7a9af3a1d6ef6b11a45462ec2f5441fdae8 Mon Sep 17 00:00:00 2001 From: Siegfried Weber Date: Mon, 21 Oct 2024 12:54:08 +0200 Subject: [PATCH] feat: custom log directory (#479) * Use a generic ResolvedLogDir instead of the concrete S3LogDir * Add a test for customLogDirectory * Document the property customLogDirectory * Use "log directory" instead of "S3 log directory" when the resolved log directory structure is used * Update CRD docs * Update docs/modules/spark-k8s/pages/index.adoc Co-authored-by: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> * Update CHANGELOG.md Co-authored-by: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> --------- Co-authored-by: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> --- CHANGELOG.md | 2 + deploy/helm/spark-k8s-operator/crds/crds.yaml | 16 ++- docs/modules/spark-k8s/pages/index.adoc | 4 +- .../pages/usage-guide/history-server.adoc | 88 +++++++++++- rust/crd/src/history.rs | 19 +-- rust/crd/src/lib.rs | 42 +++--- rust/crd/src/{s3logdir.rs => logdir.rs} | 135 ++++++++++++++---- rust/crd/src/roles.rs | 6 +- rust/crd/src/tlscerts.rs | 8 +- .../src/history/history_controller.rs | 50 ++++--- .../src/spark_k8s_controller.rs | 60 ++++---- .../kuttl/custom-log-directory/00-assert.yaml | 9 ++ .../custom-log-directory/00-patch-ns.yaml.j2 | 9 ++ .../00-serviceaccount.yaml.j2 | 29 ++++ .../custom-log-directory/01-assert.yaml.j2 | 10 ++ ...tor-aggregator-discovery-configmap.yaml.j2 | 9 ++ .../kuttl/custom-log-directory/02-assert.yaml | 14 ++ .../02-install-zookeeper.yaml.j2 | 28 ++++ .../kuttl/custom-log-directory/03-assert.yaml | 30 ++++ .../03-install-hdfs.yaml.j2 | 35 +++++ .../kuttl/custom-log-directory/10-assert.yaml | 11 ++ .../10-deploy-history-server.yaml.j2 | 39 +++++ .../kuttl/custom-log-directory/11-assert.yaml | 11 ++ .../11-deploy-spark-app.yaml.j2 | 40 ++++++ .../kuttl/custom-log-directory/20-assert.yaml | 13 ++ .../custom-log-directory/20-test-logs.yaml | 20 +++ tests/test-definition.yaml | 6 + 27 files changed, 619 insertions(+), 124 deletions(-) rename rust/crd/src/{s3logdir.rs => logdir.rs} (67%) create mode 100644 tests/templates/kuttl/custom-log-directory/00-assert.yaml create mode 100644 tests/templates/kuttl/custom-log-directory/00-patch-ns.yaml.j2 create mode 100644 tests/templates/kuttl/custom-log-directory/00-serviceaccount.yaml.j2 create mode 100644 tests/templates/kuttl/custom-log-directory/01-assert.yaml.j2 create mode 100644 tests/templates/kuttl/custom-log-directory/01-install-vector-aggregator-discovery-configmap.yaml.j2 create mode 100644 tests/templates/kuttl/custom-log-directory/02-assert.yaml create mode 100644 tests/templates/kuttl/custom-log-directory/02-install-zookeeper.yaml.j2 create mode 100644 tests/templates/kuttl/custom-log-directory/03-assert.yaml create mode 100644 tests/templates/kuttl/custom-log-directory/03-install-hdfs.yaml.j2 create mode 100644 tests/templates/kuttl/custom-log-directory/10-assert.yaml create mode 100644 tests/templates/kuttl/custom-log-directory/10-deploy-history-server.yaml.j2 create mode 100644 tests/templates/kuttl/custom-log-directory/11-assert.yaml create mode 100644 tests/templates/kuttl/custom-log-directory/11-deploy-spark-app.yaml.j2 create mode 100644 tests/templates/kuttl/custom-log-directory/20-assert.yaml create mode 100644 tests/templates/kuttl/custom-log-directory/20-test-logs.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 68568c5c..31a91754 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ All notable changes to this project will be documented in this file. ### Added - Make spark-env.sh configurable via `configOverrides` ([#473]). +- The Spark history server can now service logs from HDFS compatible systems ([#479]). ### Changed @@ -33,6 +34,7 @@ All notable changes to this project will be documented in this file. [#460]: https://github.com/stackabletech/spark-k8s-operator/pull/460 [#472]: https://github.com/stackabletech/spark-k8s-operator/pull/472 [#473]: https://github.com/stackabletech/spark-k8s-operator/pull/473 +[#479]: https://github.com/stackabletech/spark-k8s-operator/pull/479 ## [24.7.0] - 2024-07-24 diff --git a/deploy/helm/spark-k8s-operator/crds/crds.yaml b/deploy/helm/spark-k8s-operator/crds/crds.yaml index 4e5a25d7..949f0431 100644 --- a/deploy/helm/spark-k8s-operator/crds/crds.yaml +++ b/deploy/helm/spark-k8s-operator/crds/crds.yaml @@ -615,13 +615,19 @@ spec: x-kubernetes-preserve-unknown-fields: true type: object logFileDirectory: - description: The log file directory definition used by the Spark history server. Currently only S3 buckets are supported. + description: The log file directory definition used by the Spark history server. nullable: true oneOf: - required: - s3 + - required: + - customLogDirectory properties: + customLogDirectory: + description: A custom log directory + type: string s3: + description: An S3 bucket storing the log events properties: bucket: oneOf: @@ -1065,12 +1071,18 @@ spec: type: string type: object logFileDirectory: - description: The log file directory definition used by the Spark history server. Currently only S3 buckets are supported. + description: The log file directory definition used by the Spark history server. oneOf: - required: - s3 + - required: + - customLogDirectory properties: + customLogDirectory: + description: A custom log directory + type: string s3: + description: An S3 bucket storing the log events properties: bucket: oneOf: diff --git a/docs/modules/spark-k8s/pages/index.adoc b/docs/modules/spark-k8s/pages/index.adoc index eb41ac98..01f398b3 100644 --- a/docs/modules/spark-k8s/pages/index.adoc +++ b/docs/modules/spark-k8s/pages/index.adoc @@ -37,8 +37,8 @@ The SparkApplication resource is the main point of interaction with the operator An exhaustive list of options is given in the {crd}[SparkApplication CRD reference {external-link-icon}^]. The xref:usage-guide/history-server.adoc[SparkHistoryServer] has a single `node` role. -It is used to deploy a https://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact[Spark history server] that displays application logs from S3 buckets. -Of course, your applications need to write their logs to the same buckets. +It is used to deploy a https://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact[Spark history server] that displays application logs. +Of course, your applications need to write their logs to the same location. === Kubernetes resources diff --git a/docs/modules/spark-k8s/pages/usage-guide/history-server.adoc b/docs/modules/spark-k8s/pages/usage-guide/history-server.adoc index 8a926ae5..52a51a4f 100644 --- a/docs/modules/spark-k8s/pages/usage-guide/history-server.adoc +++ b/docs/modules/spark-k8s/pages/usage-guide/history-server.adoc @@ -17,9 +17,7 @@ For more details on how the Stackable Data Platform manages S3 resources see the include::example$example-history-server.yaml[] ---- -<1> The location of the event logs. - Must be an S3 bucket. - Future implementations might add support for other shared filesystems such as HDFS. +<1> The location of the event logs, see <> for other options. <2> Directory within the S3 bucket where the log files are located. This directory is required and must exist before setting up the history server. <3> The S3 bucket definition, here provided in-line. @@ -56,7 +54,91 @@ include::example$example-history-app.yaml[] <5> Bucket to store logs. This must match the bucket used by the history server. <6> Credentials used to write event logs. These can, of course, differ from the credentials used to process data. +[#log-dir-variants] +== Supported file systems for storing log events +=== S3 + +As already shown in the example above, the event logs can be stored in an S3 bucket: + +[source,yaml] +---- +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkHistoryServer +spec: + logFileDirectory: + s3: + prefix: eventlogs/ + bucket: + ... +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +spec: + logFileDirectory: + s3: + prefix: eventlogs/ + bucket: + ... +---- + +=== Custom log directory + +If there is no structure provided for the desired file system, it can nevertheless be set with the property `customLogDirectory`. +Additional configuration overrides may be necessary in this case. + +For instance, to store the Spark event logs in HDFS, the following configuration could be used: + +[source,yaml] +---- +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkHistoryServer +spec: + logFileDirectory: + customLogDirectory: hdfs://simple-hdfs/eventlogs/ # <1> + nodes: + envOverrides: + HADOOP_CONF_DIR: /stackable/hdfs-config # <2> + podOverrides: + spec: + containers: + - name: spark-history + volumeMounts: + - name: hdfs-config + mountPath: /stackable/hdfs-config + volumes: + - name: hdfs-config + configMap: + name: hdfs # <3> +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +spec: + logFileDirectory: + customLogDirectory: hdfs://simple-hdfs/eventlogs/ # <4> + sparkConf: + spark.driver.extraClassPath: /stackable/hdfs-config # <5> + driver: + config: + volumeMounts: + - name: hdfs-config + mountPath: /stackable/hdfs-config + volumes: + - name: hdfs-config + configMap: + name: hdfs +---- + +<1> A custom log directory that is used for the Spark option `spark.history.fs.logDirectory`. + The required dependencies must be on the class path. + This is the case for HDFS. +<2> The Spark History Server looks for the Hadoop configuration in the directory defined by the environment variable `HADOOP_CONF_DIR`. +<3> The ConfigMap containing the Hadoop configuration files `core-site.xml` and `hdfs-site.xml`. +<4> A custom log directory that is used for the Spark option `spark.eventLog.dir`. + Additionally, the Spark option `spark.eventLog.enabled` is set to `true`. +<5> The Spark driver looks for the Hadoop configuration on the class path. == History Web UI diff --git a/rust/crd/src/history.rs b/rust/crd/src/history.rs index e2e8611c..371cc502 100644 --- a/rust/crd/src/history.rs +++ b/rust/crd/src/history.rs @@ -1,5 +1,4 @@ -use crate::s3logdir::S3LogDir; -use crate::tlscerts; +use crate::logdir::ResolvedLogDir; use crate::{affinity::history_affinity, constants::*}; use product_config::{types::PropertyNameKind, ProductConfigManager}; @@ -78,7 +77,6 @@ pub struct SparkHistoryServerSpec { pub vector_aggregator_config_map_name: Option, /// The log file directory definition used by the Spark history server. - /// Currently only S3 buckets are supported. pub log_file_directory: LogFileDirectorySpec, /// A map of key/value strings that will be passed directly to Spark when deploying the history server. @@ -235,7 +233,7 @@ impl SparkHistoryServer { pub fn merged_env( &self, - s3logdir: &S3LogDir, + logdir: &ResolvedLogDir, role_group_env_overrides: HashMap, ) -> Vec { // Maps env var name to env var object. This allows env_overrides to work @@ -271,7 +269,7 @@ impl SparkHistoryServer { ]; // if TLS is enabled build truststore - if tlscerts::tls_secret_name(&s3logdir.bucket.connection).is_some() { + if logdir.tls_enabled() { history_opts.extend(vec![ format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"), format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"), @@ -327,8 +325,11 @@ impl SparkHistoryServer { #[derive(Clone, Debug, Deserialize, JsonSchema, Serialize, Display)] #[serde(rename_all = "camelCase")] pub enum LogFileDirectorySpec { + /// An S3 bucket storing the log events #[strum(serialize = "s3")] S3(S3LogFileDirectorySpec), + /// A custom log directory + CustomLogDirectory(String), } #[derive(Clone, Debug, Deserialize, JsonSchema, Serialize)] @@ -456,6 +457,8 @@ impl Configuration for HistoryConfigFragment { #[cfg(test)] mod test { + use crate::logdir::S3LogDir; + use super::*; use indoc::indoc; use stackable_operator::commons::{ @@ -495,7 +498,7 @@ mod test { let history: SparkHistoryServer = serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap(); - let s3_log_dir: S3LogDir = S3LogDir { + let log_dir = ResolvedLogDir::S3(S3LogDir { bucket: ResolvedS3Bucket { bucket_name: "my-bucket".to_string(), connection: ResolvedS3Connection { @@ -507,10 +510,10 @@ mod test { }, }, prefix: "prefix".to_string(), - }; + }); let merged_env = history.merged_env( - &s3_log_dir, + &log_dir, history .spec .nodes diff --git a/rust/crd/src/lib.rs b/rust/crd/src/lib.rs index 1ac02fcb..76c94e40 100644 --- a/rust/crd/src/lib.rs +++ b/rust/crd/src/lib.rs @@ -3,15 +3,15 @@ pub mod affinity; pub mod constants; pub mod history; +pub mod logdir; pub mod roles; -pub mod s3logdir; pub mod tlscerts; pub use crate::roles::*; use constants::*; use history::LogFileDirectorySpec; +use logdir::ResolvedLogDir; use product_config::{types::PropertyNameKind, ProductConfigManager}; -use s3logdir::S3LogDir; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, Snafu}; use stackable_operator::{ @@ -101,8 +101,8 @@ pub enum Error { #[snafu(display("failed to configure S3 connection/bucket"))] ConfigureS3 { source: S3Error }, - #[snafu(display("failed to configure S3 log directory"))] - ConfigureS3LogDir { source: s3logdir::Error }, + #[snafu(display("failed to configure log directory"))] + ConfigureLogDir { source: logdir::Error }, } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize, JsonSchema)] @@ -208,7 +208,6 @@ pub struct SparkApplicationSpec { pub env: Vec, /// The log file directory definition used by the Spark history server. - /// Currently only S3 buckets are supported. #[serde(default, skip_serializing_if = "Option::is_none")] pub log_file_directory: Option, } @@ -280,7 +279,7 @@ impl SparkApplication { pub fn volumes( &self, s3conn: &Option, - s3logdir: &Option, + logdir: &Option, log_config_map: Option<&str>, ) -> Result, Error> { let mut result: Vec = self.spec.volumes.clone(); @@ -313,11 +312,8 @@ impl SparkApplication { ); } - if let Some(log_dir) = s3logdir.as_ref() { - if let Some(volume) = log_dir - .credentials_volume() - .context(ConfigureS3LogDirSnafu)? - { + if let Some(log_dir) = logdir.as_ref() { + if let Some(volume) = log_dir.credentials_volume().context(ConfigureLogDirSnafu)? { result.push(volume); } } @@ -348,7 +344,7 @@ impl SparkApplication { .build(), ); } - if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, s3logdir) { + if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, logdir) { result.push( VolumeBuilder::new(STACKABLE_TRUST_STORE_NAME) .with_empty_dir(None::, Some(Quantity("5Mi".to_string()))) @@ -385,7 +381,7 @@ impl SparkApplication { pub fn spark_job_volume_mounts( &self, s3conn: &Option, - s3logdir: &Option, + logdir: &Option, ) -> Vec { let mut tmpl_mounts = vec![ VolumeMount { @@ -400,7 +396,7 @@ impl SparkApplication { }, ]; - tmpl_mounts = self.add_common_volume_mounts(tmpl_mounts, s3conn, s3logdir, false); + tmpl_mounts = self.add_common_volume_mounts(tmpl_mounts, s3conn, logdir, false); if let Some(CommonConfiguration { config: @@ -424,7 +420,7 @@ impl SparkApplication { &self, mut mounts: Vec, s3conn: &Option, - s3logdir: &Option, + logdir: &Option, logging_enabled: bool, ) -> Vec { if self.spec.image.is_some() { @@ -457,7 +453,7 @@ impl SparkApplication { }); } - if let Some(vm) = s3logdir.as_ref().and_then(|o| o.credentials_volume_mount()) { + if let Some(vm) = logdir.as_ref().and_then(|o| o.credentials_volume_mount()) { mounts.push(vm); } @@ -482,7 +478,7 @@ impl SparkApplication { ..VolumeMount::default() }); } - if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, s3logdir) { + if let Some(cert_secrets) = tlscerts::tls_secret_names(s3conn, logdir) { mounts.push(VolumeMount { name: STACKABLE_TRUST_STORE_NAME.into(), mount_path: STACKABLE_TRUST_STORE.into(), @@ -521,7 +517,7 @@ impl SparkApplication { &self, serviceaccount_name: &str, s3conn: &Option, - s3_log_dir: &Option, + log_dir: &Option, spark_image: &str, ) -> Result, Error> { // mandatory properties @@ -584,7 +580,7 @@ impl SparkApplication { let mut extra_java_opts = vec![format!( "-Djava.security.properties={VOLUME_MOUNT_PATH_LOG_CONFIG}/{JVM_SECURITY_PROPERTIES_FILE}" )]; - if tlscerts::tls_secret_names(s3conn, s3_log_dir).is_some() { + if tlscerts::tls_secret_names(s3conn, log_dir).is_some() { extra_java_opts.extend(vec![ format!("-Djavax.net.ssl.trustStore={STACKABLE_TRUST_STORE}/truststore.p12"), format!("-Djavax.net.ssl.trustStorePassword={STACKABLE_TLS_STORE_PASSWORD}"), @@ -645,11 +641,11 @@ impl SparkApplication { submit_conf.insert("spark.executor.instances".to_string(), replicas.to_string()); } - if let Some(log_dir) = s3_log_dir { + if let Some(log_dir) = log_dir { submit_conf.extend( log_dir .application_spark_config() - .context(ConfigureS3LogDirSnafu)?, + .context(ConfigureLogDirSnafu)?, ); } @@ -683,7 +679,7 @@ impl SparkApplication { pub fn env( &self, s3conn: &Option, - s3logdir: &Option, + logdir: &Option, ) -> Vec { let mut e: Vec = self.spec.env.clone(); if self.requirements().is_some() { @@ -695,7 +691,7 @@ impl SparkApplication { value_from: None, }); } - if tlscerts::tls_secret_names(s3conn, s3logdir).is_some() { + if tlscerts::tls_secret_names(s3conn, logdir).is_some() { e.push(EnvVar { name: "STACKABLE_TLS_STORE_PASSWORD".to_string(), value: Some(STACKABLE_TLS_STORE_PASSWORD.to_string()), diff --git a/rust/crd/src/s3logdir.rs b/rust/crd/src/logdir.rs similarity index 67% rename from rust/crd/src/s3logdir.rs rename to rust/crd/src/logdir.rs index 2cc3f8a6..6aea7deb 100644 --- a/rust/crd/src/s3logdir.rs +++ b/rust/crd/src/logdir.rs @@ -49,6 +49,103 @@ pub enum Error { ConfigureS3 { source: S3Error }, } +pub enum ResolvedLogDir { + S3(S3LogDir), + Custom(String), +} + +impl ResolvedLogDir { + pub async fn resolve( + log_file_dir: &LogFileDirectorySpec, + namespace: Option, + client: &stackable_operator::client::Client, + ) -> Result { + match log_file_dir { + S3(s3_log_dir) => S3LogDir::resolve(s3_log_dir, namespace, client) + .await + .map(ResolvedLogDir::S3), + LogFileDirectorySpec::CustomLogDirectory(custom_log_dir) => { + Ok(ResolvedLogDir::Custom(custom_log_dir.to_owned())) + } + } + } + + pub fn tls_enabled(&self) -> bool { + self.tls_secret_name().is_some() + } + + pub fn tls_secret_name(&self) -> Option<&str> { + match self { + ResolvedLogDir::S3(s3_log_dir) => { + tlscerts::tls_secret_name(&s3_log_dir.bucket.connection) + } + ResolvedLogDir::Custom(_) => None, + } + } + + pub fn history_server_spark_config(&self) -> Result, Error> { + match self { + ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.history_server_spark_config(), + ResolvedLogDir::Custom(custom_log_dir) => Ok(BTreeMap::from([( + "spark.history.fs.logDirectory".to_string(), + custom_log_dir.to_string(), + )])), + } + } + + pub fn application_spark_config(&self) -> Result, Error> { + match self { + ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.application_spark_config(), + ResolvedLogDir::Custom(custom_log_dir) => Ok(BTreeMap::from([ + ("spark.eventLog.enabled".to_string(), "true".to_string()), + ("spark.eventLog.dir".to_string(), custom_log_dir.to_string()), + ])), + } + } + + pub fn volumes(&self) -> Result, Error> { + match self { + ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.volumes(), + ResolvedLogDir::Custom(_) => Ok(vec![]), + } + } + + pub fn volume_mounts(&self) -> Vec { + match self { + ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.volume_mounts(), + ResolvedLogDir::Custom(_) => vec![], + } + } + + pub fn credentials_volume(&self) -> Result, Error> { + match self { + ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials_volume(), + ResolvedLogDir::Custom(_) => Ok(None), + } + } + + pub fn credentials_volume_mount(&self) -> Option { + match self { + ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials_volume_mount(), + ResolvedLogDir::Custom(_) => None, + } + } + + pub fn credentials(&self) -> Option { + match self { + ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials(), + ResolvedLogDir::Custom(_) => None, + } + } + + pub fn credentials_mount_path(&self) -> Option { + match self { + ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials_mount_path(), + ResolvedLogDir::Custom(_) => None, + } + } +} + pub struct S3LogDir { pub bucket: ResolvedS3Bucket, pub prefix: String, @@ -56,39 +153,25 @@ pub struct S3LogDir { impl S3LogDir { pub async fn resolve( - log_file_dir: Option<&LogFileDirectorySpec>, + log_file_dir: &S3LogFileDirectorySpec, namespace: Option, client: &stackable_operator::client::Client, - ) -> Result, Error> { - #[allow(irrefutable_let_patterns)] - let (bucket, prefix) = if let Some(S3(S3LogFileDirectorySpec { - bucket: bucket_def, - prefix, - })) = log_file_dir - { - ( - bucket_def - .clone() - .resolve(client, namespace.unwrap().as_str()) - .await - .context(ConfigureS3Snafu)?, - prefix.clone(), - ) - } else { - // !!!!! - // Ugliness alert! - // No point in trying to resolve the connection anymore since there is no - // log_file_dir in the first place. - // This can casually happen for Spark applications that don't use a history server - // !!!!! - return Ok(None); - }; + ) -> Result { + let bucket = log_file_dir + .bucket + .clone() + .resolve(client, namespace.unwrap().as_str()) + .await + .context(ConfigureS3Snafu)?; if bucket.connection.tls.uses_tls() && !bucket.connection.tls.uses_tls() { return S3TlsNoVerificationNotSupportedSnafu.fail(); } - Ok(Some(S3LogDir { bucket, prefix })) + Ok(S3LogDir { + bucket, + prefix: log_file_dir.prefix.to_owned(), + }) } /// Constructs the properties needed for loading event logs from S3. diff --git a/rust/crd/src/roles.rs b/rust/crd/src/roles.rs index bf3111c4..ef4fd052 100644 --- a/rust/crd/src/roles.rs +++ b/rust/crd/src/roles.rs @@ -13,11 +13,11 @@ //! each role is named "default". These roles are transparent to the user. //! //! The history server has its own role completely unrelated to this module. +use crate::ResolvedLogDir; use std::{collections::BTreeMap, slice}; use serde::{Deserialize, Serialize}; -use crate::s3logdir::S3LogDir; use crate::SparkApplication; use stackable_operator::{ commons::{ @@ -148,10 +148,10 @@ impl RoleConfig { &self, spark_application: &SparkApplication, s3conn: &Option, - s3logdir: &Option, + logdir: &Option, ) -> Vec { let volume_mounts = self.volume_mounts.clone().into(); - spark_application.add_common_volume_mounts(volume_mounts, s3conn, s3logdir, true) + spark_application.add_common_volume_mounts(volume_mounts, s3conn, logdir, true) } } diff --git a/rust/crd/src/tlscerts.rs b/rust/crd/src/tlscerts.rs index 2ba144a2..77ac2e8e 100644 --- a/rust/crd/src/tlscerts.rs +++ b/rust/crd/src/tlscerts.rs @@ -8,7 +8,7 @@ use crate::{ STACKABLE_MOUNT_PATH_TLS, STACKABLE_TLS_STORE_PASSWORD, STACKABLE_TRUST_STORE, SYSTEM_TRUST_STORE, SYSTEM_TRUST_STORE_PASSWORD, }, - s3logdir::S3LogDir, + logdir::ResolvedLogDir, }; pub fn tls_secret_name(s3conn: &ResolvedS3Connection) -> Option<&str> { @@ -34,7 +34,7 @@ pub fn tls_secret_name(s3conn: &ResolvedS3Connection) -> Option<&str> { pub fn tls_secret_names<'a>( s3conn: &'a Option, - s3logdir: &'a Option, + logdir: &'a Option, ) -> Option> { let mut names = Vec::new(); @@ -42,8 +42,8 @@ pub fn tls_secret_names<'a>( names.push(secret_name); } - if let Some(logdir) = s3logdir { - if let Some(secret_name) = tls_secret_name(&logdir.bucket.connection) { + if let Some(logdir) = logdir { + if let Some(secret_name) = logdir.tls_secret_name() { names.push(secret_name); } } diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/history_controller.rs index a5c402af..d024590f 100644 --- a/rust/operator-binary/src/history/history_controller.rs +++ b/rust/operator-binary/src/history/history_controller.rs @@ -37,6 +37,7 @@ use stackable_operator::{ time::Duration, }; use stackable_spark_k8s_crd::constants::{METRICS_PORT, SPARK_ENV_SH_FILE_NAME}; +use stackable_spark_k8s_crd::logdir::ResolvedLogDir; use stackable_spark_k8s_crd::{ constants::{ ACCESS_KEY_ID, APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_ROLE_NAME, @@ -48,7 +49,6 @@ use stackable_spark_k8s_crd::{ }, history, history::{HistoryConfig, SparkHistoryServer, SparkHistoryServerContainer}, - s3logdir::S3LogDir, tlscerts, to_spark_env_sh_string, }; use std::collections::HashMap; @@ -124,9 +124,9 @@ pub enum Error { #[snafu(display("number of cleaner replicas exceeds 1"))] TooManyCleanerReplicas, - #[snafu(display("failed to resolve the s3 log dir configuration"))] - S3LogDir { - source: stackable_spark_k8s_crd::s3logdir::Error, + #[snafu(display("failed to resolve the log dir configuration"))] + LogDir { + source: stackable_spark_k8s_crd::logdir::Error, }, #[snafu(display("failed to create cluster resources"))] @@ -184,9 +184,9 @@ pub enum Error { stackable_operator::kvp::KeyValuePairError, }, - #[snafu(display("failed to get create the S3 log dir"))] - CreateS3LogDirVolumes { - source: stackable_spark_k8s_crd::s3logdir::Error, + #[snafu(display("failed to create the log dir volumes specification"))] + CreateLogDirVolumesSpec { + source: stackable_spark_k8s_crd::logdir::Error, }, #[snafu(display("failed to add needed volume"))] @@ -224,13 +224,13 @@ pub async fn reconcile(shs: Arc, ctx: Arc) -> Result, ctx: Arc) -> Result, ctx: Arc) -> Result, - s3_log_dir: &S3LogDir, + log_dir: &ResolvedLogDir, vector_aggregator_address: Option<&str>, ) -> Result { let cm_name = rolegroupref.object_name(); - let spark_defaults = spark_defaults(shs, s3_log_dir, rolegroupref)?; + let spark_defaults = spark_defaults(shs, log_dir, rolegroupref)?; let jvm_sec_props: BTreeMap> = config .get(&PropertyNameKind::File( @@ -421,7 +421,7 @@ fn build_stateful_set( shs: &SparkHistoryServer, resolved_product_image: &ResolvedProductImage, rolegroupref: &RoleGroupRef, - s3_log_dir: &S3LogDir, + log_dir: &ResolvedLogDir, config: &HistoryConfig, serviceaccount: &ServiceAccount, ) -> Result { @@ -475,7 +475,7 @@ fn build_stateful_set( .build(), ) .context(AddVolumeSnafu)? - .add_volumes(s3_log_dir.volumes().context(CreateS3LogDirVolumesSnafu)?) + .add_volumes(log_dir.volumes().context(CreateLogDirVolumesSpecSnafu)?) .context(AddVolumeSnafu)? .security_context(PodSecurityContext { run_as_user: Some(SPARK_UID), @@ -488,7 +488,7 @@ fn build_stateful_set( .rolegroup(rolegroupref) .with_context(|_| CannotRetrieveRoleGroupSnafu)?; - let merged_env = shs.merged_env(s3_log_dir, role_group.config.env_overrides); + let merged_env = shs.merged_env(log_dir, role_group.config.env_overrides); let container_name = "spark-history"; let container = ContainerBuilder::new(container_name) @@ -496,11 +496,11 @@ fn build_stateful_set( .image_from_product_image(resolved_product_image) .resources(config.resources.clone().into()) .command(vec!["/bin/bash".to_string()]) - .args(command_args(s3_log_dir)) + .args(command_args(log_dir)) .add_container_port("http", 18080) .add_container_port("metrics", METRICS_PORT.into()) .add_env_vars(merged_env) - .add_volume_mounts(s3_log_dir.volume_mounts()) + .add_volume_mounts(log_dir.volume_mounts()) .context(AddVolumeMountSnafu)? .add_volume_mount(VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_PATH_CONFIG) .context(AddVolumeMountSnafu)? @@ -681,12 +681,10 @@ fn build_history_role_serviceaccount( #[allow(clippy::result_large_err)] fn spark_defaults( shs: &SparkHistoryServer, - s3_log_dir: &S3LogDir, + log_dir: &ResolvedLogDir, rolegroupref: &RoleGroupRef, ) -> Result { - let mut log_dir_settings = s3_log_dir - .history_server_spark_config() - .context(S3LogDirSnafu)?; + let mut log_dir_settings = log_dir.history_server_spark_config().context(LogDirSnafu)?; // add cleaner spark settings if requested log_dir_settings.extend(cleaner_config(shs, rolegroupref)?); @@ -702,17 +700,17 @@ fn spark_defaults( .join("\n")) } -fn command_args(s3logdir: &S3LogDir) -> Vec { +fn command_args(logdir: &ResolvedLogDir) -> Vec { let mut command = vec![]; - if let Some(secret_dir) = s3logdir.credentials_mount_path() { + if let Some(secret_dir) = logdir.credentials_mount_path() { command.extend(vec![ format!("export AWS_ACCESS_KEY_ID=\"$(cat {secret_dir}/{ACCESS_KEY_ID})\""), format!("export AWS_SECRET_ACCESS_KEY=\"$(cat {secret_dir}/{SECRET_ACCESS_KEY})\""), ]); } - if let Some(secret_name) = tlscerts::tls_secret_name(&s3logdir.bucket.connection) { + if let Some(secret_name) = logdir.tls_secret_name() { command.extend(vec![format!("mkdir -p {STACKABLE_TRUST_STORE}")]); command.extend(tlscerts::convert_system_trust_store_to_pkcs12()); command.extend(tlscerts::import_truststore(secret_name)); diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index 7a3a5b4f..1866ca47 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -17,7 +17,7 @@ use stackable_operator::{ time::Duration, }; use stackable_spark_k8s_crd::{ - constants::*, s3logdir::S3LogDir, tlscerts, to_spark_env_sh_string, RoleConfig, + constants::*, logdir::ResolvedLogDir, tlscerts, to_spark_env_sh_string, RoleConfig, SparkApplication, SparkApplicationRole, SparkApplicationStatus, SparkContainer, SubmitConfig, }; @@ -122,9 +122,9 @@ pub enum Error { source: stackable_operator::builder::pod::container::Error, }, - #[snafu(display("failed to resolve the s3 log dir configuration"))] - S3LogDir { - source: stackable_spark_k8s_crd::s3logdir::Error, + #[snafu(display("failed to resolve the log dir configuration"))] + LogDir { + source: stackable_spark_k8s_crd::logdir::Error, }, #[snafu(display("failed to resolve the Vector aggregator address"))] @@ -245,13 +245,19 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) } } - let s3logdir = S3LogDir::resolve( - spark_application.spec.log_file_directory.as_ref(), - spark_application.metadata.namespace.clone(), - client, - ) - .await - .context(S3LogDirSnafu)?; + let logdir = if let Some(log_file_dir) = &spark_application.spec.log_file_directory { + Some( + ResolvedLogDir::resolve( + log_file_dir, + spark_application.metadata.namespace.clone(), + client, + ) + .await + .context(LogDirSnafu)?, + ) + } else { + None + }; let resolved_product_image = spark_application .spec @@ -287,7 +293,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) .await .context(ResolveVectorAggregatorAddressSnafu)?; - let env_vars = spark_application.env(&opt_s3conn, &s3logdir); + let env_vars = spark_application.env(&opt_s3conn, &logdir); let driver_config = spark_application .driver_config() @@ -305,7 +311,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) driver_product_config, &env_vars, &opt_s3conn, - &s3logdir, + &logdir, vector_aggregator_address.as_deref(), &resolved_product_image, )?; @@ -334,7 +340,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) executor_product_config, &env_vars, &opt_s3conn, - &s3logdir, + &logdir, vector_aggregator_address.as_deref(), &resolved_product_image, )?; @@ -351,7 +357,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) .build_command( serviceaccount.metadata.name.as_ref().unwrap(), &opt_s3conn, - &s3logdir, + &logdir, &resolved_product_image.image, ) .context(BuildCommandSnafu)?; @@ -386,7 +392,7 @@ pub async fn reconcile(spark_application: Arc, ctx: Arc) &env_vars, &job_commands, &opt_s3conn, - &s3logdir, + &logdir, &submit_config, )?; client @@ -417,7 +423,7 @@ fn init_containers( spark_application: &SparkApplication, logging: &Logging, s3conn: &Option, - s3logdir: &Option, + logdir: &Option, spark_image: &ResolvedProductImage, ) -> Result> { let mut jcb = ContainerBuilder::new(&SparkContainer::Job.to_string()) @@ -512,7 +518,7 @@ fn init_containers( .context(IllegalContainerNameSnafu)?; let mut args = Vec::new(); - let tls_container = match tlscerts::tls_secret_names(s3conn, s3logdir) { + let tls_container = match tlscerts::tls_secret_names(s3conn, logdir) { Some(cert_secrets) => { args.extend(tlscerts::convert_system_trust_store_to_pkcs12()); for cert_secret in cert_secrets { @@ -557,14 +563,14 @@ fn pod_template( volumes: &[Volume], env: &[EnvVar], s3conn: &Option, - s3logdir: &Option, + logdir: &Option, spark_image: &ResolvedProductImage, ) -> Result { let container_name = SparkContainer::Spark.to_string(); let mut cb = ContainerBuilder::new(&container_name).context(IllegalContainerNameSnafu)?; let merged_env = spark_application.merged_env(role.clone(), env); - cb.add_volume_mounts(config.volume_mounts(spark_application, s3conn, s3logdir)) + cb.add_volume_mounts(config.volume_mounts(spark_application, s3conn, logdir)) .context(AddVolumeMountSnafu)? .add_env_vars(merged_env) .resources(config.resources.clone().into()) @@ -608,7 +614,7 @@ fn pod_template( spark_application, &config.logging, s3conn, - s3logdir, + logdir, spark_image, ) .unwrap(); @@ -650,7 +656,7 @@ fn pod_template_config_map( product_config: Option<&HashMap>>, env: &[EnvVar], s3conn: &Option, - s3logdir: &Option, + logdir: &Option, vector_aggregator_address: Option<&str>, spark_image: &ResolvedProductImage, ) -> Result { @@ -669,7 +675,7 @@ fn pod_template_config_map( }; let mut volumes = spark_application - .volumes(s3conn, s3logdir, Some(&log_config_map)) + .volumes(s3conn, logdir, Some(&log_config_map)) .context(CreateVolumesSnafu)?; volumes.push( VolumeBuilder::new(VOLUME_MOUNT_NAME_CONFIG) @@ -684,7 +690,7 @@ fn pod_template_config_map( volumes.as_ref(), env, s3conn, - s3logdir, + logdir, spark_image, )?; @@ -820,7 +826,7 @@ fn spark_job( env: &[EnvVar], job_commands: &[String], s3conn: &Option, - s3logdir: &Option, + logdir: &Option, job_config: &SubmitConfig, ) -> Result { let mut cb = ContainerBuilder::new(&SparkContainer::SparkSubmit.to_string()) @@ -833,7 +839,7 @@ fn spark_job( .command(vec!["/bin/bash".to_string(), "-c".to_string()]) .args(vec![args.join(" && ")]) .resources(job_config.resources.clone().into()) - .add_volume_mounts(spark_application.spark_job_volume_mounts(s3conn, s3logdir)) + .add_volume_mounts(spark_application.spark_job_volume_mounts(s3conn, logdir)) .context(AddVolumeMountSnafu)? .add_env_vars(merged_env) .add_env_var( @@ -862,7 +868,7 @@ fn spark_job( ]; volumes.extend( spark_application - .volumes(s3conn, s3logdir, None) + .volumes(s3conn, logdir, None) .context(CreateVolumesSnafu)?, ); diff --git a/tests/templates/kuttl/custom-log-directory/00-assert.yaml b/tests/templates/kuttl/custom-log-directory/00-assert.yaml new file mode 100644 index 00000000..5baf8caa --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/00-assert.yaml @@ -0,0 +1,9 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: integration-tests-sa diff --git a/tests/templates/kuttl/custom-log-directory/00-patch-ns.yaml.j2 b/tests/templates/kuttl/custom-log-directory/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/custom-log-directory/00-serviceaccount.yaml.j2 b/tests/templates/kuttl/custom-log-directory/00-serviceaccount.yaml.j2 new file mode 100644 index 00000000..9cbf0351 --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/00-serviceaccount.yaml.j2 @@ -0,0 +1,29 @@ +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +rules: +{% if test_scenario['values']['openshift'] == "true" %} + - apiGroups: ["security.openshift.io"] + resources: ["securitycontextconstraints"] + resourceNames: ["privileged"] + verbs: ["use"] +{% endif %} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: integration-tests-sa +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +subjects: + - kind: ServiceAccount + name: integration-tests-sa +roleRef: + kind: Role + name: use-integration-tests-scc + apiGroup: rbac.authorization.k8s.io diff --git a/tests/templates/kuttl/custom-log-directory/01-assert.yaml.j2 b/tests/templates/kuttl/custom-log-directory/01-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/01-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/custom-log-directory/01-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/custom-log-directory/01-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/01-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/custom-log-directory/02-assert.yaml b/tests/templates/kuttl/custom-log-directory/02-assert.yaml new file mode 100644 index 00000000..a1e216b4 --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/02-assert.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: install-zk +timeout: 600 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-zk-server-default +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/custom-log-directory/02-install-zookeeper.yaml.j2 b/tests/templates/kuttl/custom-log-directory/02-install-zookeeper.yaml.j2 new file mode 100644 index 00000000..caecd8b8 --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/02-install-zookeeper.yaml.j2 @@ -0,0 +1,28 @@ +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperCluster +metadata: + name: test-zk +spec: + image: + productVersion: "{{ test_scenario['values']['zookeeper-latest'] }}" + pullPolicy: IfNotPresent + clusterConfig: +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + servers: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 1 +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperZnode +metadata: + name: test-znode +spec: + clusterRef: + name: test-zk diff --git a/tests/templates/kuttl/custom-log-directory/03-assert.yaml b/tests/templates/kuttl/custom-log-directory/03-assert.yaml new file mode 100644 index 00000000..8800b24d --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/03-assert.yaml @@ -0,0 +1,30 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: install-hdfs +timeout: 600 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-hdfs-namenode-default +status: + readyReplicas: 2 + replicas: 2 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-hdfs-journalnode-default +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-hdfs-datanode-default +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/custom-log-directory/03-install-hdfs.yaml.j2 b/tests/templates/kuttl/custom-log-directory/03-install-hdfs.yaml.j2 new file mode 100644 index 00000000..0bd3bb65 --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/03-install-hdfs.yaml.j2 @@ -0,0 +1,35 @@ +--- +apiVersion: hdfs.stackable.tech/v1alpha1 +kind: HdfsCluster +metadata: + name: test-hdfs +spec: + image: + productVersion: "{{ test_scenario['values']['hdfs-latest'] }}" + pullPolicy: IfNotPresent + clusterConfig: + zookeeperConfigMapName: test-znode +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + nameNodes: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 2 + dataNodes: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 1 + journalNodes: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 1 diff --git a/tests/templates/kuttl/custom-log-directory/10-assert.yaml b/tests/templates/kuttl/custom-log-directory/10-assert.yaml new file mode 100644 index 00000000..68c0e487 --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/10-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: spark-history-node-default +status: + readyReplicas: 1 diff --git a/tests/templates/kuttl/custom-log-directory/10-deploy-history-server.yaml.j2 b/tests/templates/kuttl/custom-log-directory/10-deploy-history-server.yaml.j2 new file mode 100644 index 00000000..96b860a0 --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/10-deploy-history-server.yaml.j2 @@ -0,0 +1,39 @@ +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkHistoryServer +metadata: + name: spark-history +spec: + image: +{% if test_scenario['values']['spark'].find(",") > 0 %} + custom: "{{ test_scenario['values']['spark'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['spark'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['spark'] }}" +{% endif %} + pullPolicy: IfNotPresent +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + logFileDirectory: + customLogDirectory: hdfs://// + nodes: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 1 + envOverrides: + HADOOP_CONF_DIR: /stackable/hdfs-config + podOverrides: + spec: + containers: + - name: spark-history + volumeMounts: + - name: hdfs-config + mountPath: /stackable/hdfs-config + volumes: + - name: hdfs-config + configMap: + name: test-hdfs diff --git a/tests/templates/kuttl/custom-log-directory/11-assert.yaml b/tests/templates/kuttl/custom-log-directory/11-assert.yaml new file mode 100644 index 00000000..016a39c0 --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/11-assert.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: spark-pi +status: + phase: Succeeded diff --git a/tests/templates/kuttl/custom-log-directory/11-deploy-spark-app.yaml.j2 b/tests/templates/kuttl/custom-log-directory/11-deploy-spark-app.yaml.j2 new file mode 100644 index 00000000..730ddd44 --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/11-deploy-spark-app.yaml.j2 @@ -0,0 +1,40 @@ +--- +apiVersion: spark.stackable.tech/v1alpha1 +kind: SparkApplication +metadata: + name: spark-pi +spec: +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + sparkImage: +{% if test_scenario['values']['spark'].find(",") > 0 %} + custom: "{{ test_scenario['values']['spark'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['spark'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['spark'] }}" +{% endif %} + pullPolicy: IfNotPresent + mode: cluster + mainClass: org.apache.spark.examples.SparkPi + mainApplicationFile: local:///stackable/spark/examples/jars/spark-examples.jar + sparkConf: + spark.driver.extraClassPath: /stackable/hdfs-config + logFileDirectory: + customLogDirectory: hdfs://// + driver: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + volumeMounts: + - name: hdfs-config + mountPath: /stackable/hdfs-config + executor: + replicas: 1 + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + volumes: + - name: hdfs-config + configMap: + name: test-hdfs diff --git a/tests/templates/kuttl/custom-log-directory/20-assert.yaml b/tests/templates/kuttl/custom-log-directory/20-assert.yaml new file mode 100644 index 00000000..661f45eb --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/20-assert.yaml @@ -0,0 +1,13 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: history-api-check +timeout: 180 +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: history-api-check +status: + succeeded: 1 diff --git a/tests/templates/kuttl/custom-log-directory/20-test-logs.yaml b/tests/templates/kuttl/custom-log-directory/20-test-logs.yaml new file mode 100644 index 00000000..2e3fa692 --- /dev/null +++ b/tests/templates/kuttl/custom-log-directory/20-test-logs.yaml @@ -0,0 +1,20 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: history-api-check +spec: + template: + spec: + restartPolicy: OnFailure + activeDeadlineSeconds: 100 + containers: + - name: history-api-check + image: docker.stackable.tech/stackable/testing-tools:0.2.0-stackable0.0.0-dev + command: + [ + "bash", + "-x", + "-c", + "test 1 == $(curl http://spark-history-node-default:18080/api/v1/applications | jq length)", + ] diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index cf7cd70a..a25118e2 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -100,6 +100,12 @@ tests: - hdfs-latest - zookeeper-latest - openshift + - name: custom-log-directory + dimensions: + - spark + - hdfs-latest + - zookeeper-latest + - openshift suites: - name: nightly patch: