From 9659f4b39ab66fcf5329a4eccff15e97245b04f0 Mon Sep 17 00:00:00 2001 From: Anton Mazhurin Date: Fri, 21 Jan 2022 09:12:26 -0500 Subject: [PATCH] Rawlog fix. (#107) * Dynamic threshold. We use a more aggressive threshold if an incident is being detected. (#100) * Fix the labeler: save all columng of request_sets (#101) * JAVA sdk downgraded to fix s3 issue. Incident detector null fix. Optional low rate attack detection. (#102) * Rawlog fix. Attack detection fix(F.lit). (#106) Co-authored-by: Maria Karanasou --- src/baskerville/models/base_spark.py | 2 +- .../models/pipeline_tasks/tasks.py | 61 ++++++++----------- src/baskerville/spark/__init__.py | 2 + src/baskerville/spark/helpers.py | 6 +- 4 files changed, 31 insertions(+), 40 deletions(-) diff --git a/src/baskerville/models/base_spark.py b/src/baskerville/models/base_spark.py index a3c40ef4..1031d583 100644 --- a/src/baskerville/models/base_spark.py +++ b/src/baskerville/models/base_spark.py @@ -398,7 +398,7 @@ def get_window(self): # self.spark_conf.storage_level # ) if not window_df.rdd.isEmpty(): - print(f'# Request sets = {window_df.count()}') + self.logger.info(f'# Request sets = {window_df.count()}') yield window_df else: self.logger.info(f'Empty window df for {str(filter_._jc)}') diff --git a/src/baskerville/models/pipeline_tasks/tasks.py b/src/baskerville/models/pipeline_tasks/tasks.py index acfbd591..58fa25ef 100644 --- a/src/baskerville/models/pipeline_tasks/tasks.py +++ b/src/baskerville/models/pipeline_tasks/tasks.py @@ -404,7 +404,7 @@ def process_data(self): self.logger.info('No data in to process.') else: for window_df in get_window( - self.df, self.time_bucket, self.config.spark.storage_level + df_original, self.time_bucket, self.config.spark.storage_level, self.logger ): self.df = window_df.repartition( *self.group_by_cols @@ -1245,7 +1245,6 @@ def upsert_feedback_context(self): def prepare_to_save(self): try: success = self.upsert_feedback_context() - self.df.show() success = True if success: # explode submitted feedback first @@ -1257,7 +1256,6 @@ def prepare_to_save(self): F.col('id_fc').alias('sumbitted_context_id'), F.explode('feedback').alias('feedback') ).cache() - self.df.show() self.df = self.df.select( F.col('uuid_organization').alias('top_uuid_organization'), F.col('id_context').alias('client_id_context'), @@ -1771,22 +1769,34 @@ def label_with_id_and_set(metric, self, return_value): def classify_anomalies(self): self.logger.info('Anomaly thresholding...') - hosts = self.incident_detector.get_hosts_with_incidents() if self.incident_detector else [] + if self.incident_detector: + self.logger.info('Getting hosts with incidents...') + hosts = self.incident_detector.get_hosts_with_incidents() + else: + hosts = [] - self.df = self.df.withColumn('threshold', + self.logger.info(f'Number of hosts under attack {len(hosts)}.') + + self.df = self.df.withColumn('attack_prediction', F.when(F.col('target').isin(hosts), - self.config.engine.anomaly_threshold_during_incident).otherwise( - self.config.engine.anomaly_threshold)) + F.lit(1)).otherwise(F.lit(0))) + self.logger.info(f'Dynamic thresholds calculation...') + self.df = self.df.withColumn('threshold', + F.when(F.col('target').isin(hosts), + F.lit(self.config.engine.anomaly_threshold_during_incident)).otherwise( + F.lit(self.config.engine.anomaly_threshold))) + self.logger.info(f'Dynamic thresholding...') self.df = self.df.withColumn( 'prediction', - F.when(F.col('score') > F.col('threshold'), F.lit(1.0)).otherwise(F.lit(0.))) + F.when(F.col('score') > F.col('threshold'), F.lit(1)).otherwise(F.lit(0))) self.df = self.df.drop('threshold') def detect_low_rate_attack(self): if not self.config.engine.low_rate_attack_enabled: - self.df = self.df.withColumn('low_rate_attack', 0.0) + self.logger.info('Skipping low rate attack detection.') + self.df = self.df.withColumn('low_rate_attack', F.lit(0)) return self.logger.info('Low rate attack detecting...') @@ -1795,50 +1805,29 @@ def detect_low_rate_attack(self): 'features', F.from_json('features', self.low_rate_attack_schema) ) - self.df.select('features').show(1, False) self.df = self.df.withColumn( 'features.request_total', F.col('features.request_total').cast( T.DoubleType() ).alias('features.request_total') - ).persist(self.config.spark.storage_level) + ) self.df = self.df.withColumn( 'low_rate_attack', - F.when(self.lra_condition, 1.0).otherwise(0.0) - ) - - def detect_attack(self): - self.logger.info('Attack detecting...') - self.detect_low_rate_attack() - # return df_attack - return self.df - - def updated_df_with_attacks(self, df_attack): - self.df = self.df.join( - df_attack, - on=[df_attack.uuid_request_set == self.df.uuid_request_set], - how='left' + F.when(self.lra_condition, F.lit(1)).otherwise(F.lit(0)) ) def run(self): if get_dtype_for_col(self.df, 'features') == 'string': + self.logger.info('Unwrapping features from json...') + # this can be true when running the raw log pipeline self.df = self.df.withColumn( "features", F.from_json("features", self.features_schema) ) - self.df = self.df.repartition('target').persist( - self.config.spark.storage_level - ) - self.classify_anomalies() - df_attack = self.detect_attack() - - # 'attack_prediction' column is not set anymore in this task - self.df = self.df.withColumn('attack_prediction', F.lit(0)) - if not df_has_rows(df_attack): - self.updated_df_with_attacks(df_attack) - self.logger.info('No attacks detected...') + self.classify_anomalies() + self.detect_low_rate_attack() self.df = super().run() return self.df diff --git a/src/baskerville/spark/__init__.py b/src/baskerville/spark/__init__.py index a6a08f8d..425050f6 100644 --- a/src/baskerville/spark/__init__.py +++ b/src/baskerville/spark/__init__.py @@ -191,6 +191,8 @@ def get_or_create_spark_session(spark_conf): conf.set('spark.kubernetes.driver.pod.name', os.environ['MY_POD_NAME']) conf.set('spark.driver.host', os.environ['MY_POD_IP']) conf.set('spark.driver.port', 20020) + else: + conf.set('spark.sql.codegen.wholeStage', 'false') spark = SparkSession.builder \ .config(conf=conf) \ diff --git a/src/baskerville/spark/helpers.py b/src/baskerville/spark/helpers.py index d0ad45e2..5db18da9 100644 --- a/src/baskerville/spark/helpers.py +++ b/src/baskerville/spark/helpers.py @@ -262,7 +262,7 @@ def columns_to_dict(df, col_name, columns_to_gather): ) -def get_window(df, time_bucket: TimeBucket, storage_level: str): +def get_window(df, time_bucket: TimeBucket, storage_level: str, logger): df = df.withColumn( 'timestamp', F.col('@timestamp').cast('timestamp') ) @@ -282,10 +282,10 @@ def get_window(df, time_bucket: TimeBucket, storage_level: str): ) window_df = df.where(filter_) #.persist(storage_level) if not window_df.rdd.isEmpty(): - print(f'# Request sets = {window_df.count()}') + logger.info(f'# Request sets = {window_df.count()}') yield window_df else: - print(f'Empty window df for {str(filter_._jc)}') + logger.info(f'Empty window df for {str(filter_._jc)}') current_window_start = current_window_start + time_bucket.td current_end = current_window_start + time_bucket.td if current_window_start >= stop: