Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/add blacklisted apikeys #184

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions src/main/scala/ignition/core/jobs/CoreJobRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.joda.time.{DateTime, DateTimeZone}
import org.slf4j.{Logger, LoggerFactory}
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter}

import scala.concurrent.Future

object CoreJobRunner {

val logger: Logger = LoggerFactory.getLogger(getClass)
val tagFormat = "yyyy_MM_dd'T'HH_mm_ss'UTC'"
val today = DateTime.now().withZone(DateTimeZone.UTC)
val todayFormatted = DateTimeFormat.forPattern(tagFormat).print(today)

case class RunnerContext(sparkContext: SparkContext,
sparkSession: SparkSession,
Expand All @@ -29,8 +33,8 @@ object CoreJobRunner {
}

case class RunnerConfig(setupName: String = "nosetup",
date: DateTime = DateTime.now.withZone(DateTimeZone.UTC),
tag: String = "notag",
date: DateTime = today,
tag: String = todayFormatted,
user: String = "nouser",
master: String = "local[*]",
executorMemory: String = "2G",
Expand Down Expand Up @@ -76,11 +80,6 @@ object CoreJobRunner {


val builder = SparkSession.builder
builder.config("spark.executor.memory", config.executorMemory)

builder.config("spark.eventLog.dir", "file:///media/tmp/spark-events")

builder.master(config.master)
builder.appName(appName)

builder.config("spark.hadoop.mapred.output.committer.class", classOf[DirectOutputCommitter].getName())
Expand Down Expand Up @@ -114,7 +113,7 @@ object CoreJobRunner {
} catch {
case t: Throwable =>
t.printStackTrace()
System.exit(1) // force exit of all threads
throw t;
}

import scala.concurrent.ExecutionContext.Implicits.global
Expand Down
47 changes: 36 additions & 11 deletions src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ object SparkContextUtils {
} catch {
case NonFatal(ex) =>
println(s"Failed to read resource from '$path': ${ex.getMessage} -- ${ex.getFullStackTraceString}")
throw new Exception(s"Failed to read resource from '$path': ${ex.getMessage} -- ${ex.getFullStackTraceString}")
ArrayBuffer()
} finally {
close(inputStream, path)
}
Expand Down Expand Up @@ -333,7 +333,8 @@ object SparkContextUtils {
AutoCloseableIterator.wrap(finalLines, () => close(inputStream, s"${file.path}, slice $slice"))
} catch {
case NonFatal(e) =>
throw new Exception(s"Error on read compressed big file, slice=$slice, file=$file", e)
println(s"Error on read compressed big file, slice=$slice, file=$file \n $e")
AutoCloseableIterator.empty
}
}
}
Expand All @@ -348,13 +349,25 @@ object SparkContextUtils {
"mapreduce.input.fileinputformat.split.maxsize" -> maxSplitSize.toString))
.foldLeft(new Configuration()) { case (acc, (k, v)) => acc.set(k, v); acc }

def read(file: HadoopFile, conf: Configuration) = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](conf = conf, fClass = classOf[TextInputFormat],
kClass = classOf[LongWritable], vClass = classOf[Text], path = file.path).map(pair => pair._2.toString)
def read(file: HadoopFile, conf: Configuration) = {

val confUncompressed = confWith(maxBytesPerPartition)

val union = new UnionRDD(sc, bigFiles.map { file =>
try {
sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](
conf = conf,
fClass = classOf[TextInputFormat],
kClass = classOf[LongWritable],
vClass = classOf[Text],
path = file.path)
.map(pair => pair._2.toString)
} catch {
case NonFatal(e) =>
println(s"Error on read file=$file\n $e")
sc.emptyRDD[String]
}
}

val confUncompressed = confWith(maxBytesPerPartition)
val union: UnionRDD[String] = new UnionRDD(sc, bigFiles.map { file =>
if (sizeBasedFileHandling.isCompressed(file))
readCompressedBigFile(file, maxBytesPerPartition, minPartitions, sizeBasedFileHandling)
else
Expand Down Expand Up @@ -560,13 +573,18 @@ object SparkContextUtils {
def isSuccessFile(file: HadoopFile): Boolean =
file.path.endsWith("_SUCCESS") || file.path.endsWith("_FINISHED")

// Blacklisted apikeys. These apikeys are for test only and shouldn't be used in production
val blacklist = Set("casasbahia-v2", "pontofrio-v2", "extra-v2", "casasbahia-test")

def excludeBlacklistedApikeys(file: HadoopFile): Boolean = !blacklist.exists(e => file.path.contains(e))

def excludePatternValidation(file: HadoopFile): Boolean =
exclusionPattern.map(pattern => !file.path.matches(pattern)).getOrElse(true)
exclusionPattern.forall(pattern => !file.path.matches(pattern))

def endsWithValidation(file: HadoopFile): Boolean =
endsWith.map { pattern =>
endsWith.forall { pattern =>
file.path.endsWith(pattern) || isSuccessFile(file)
}.getOrElse(true)
}

def dateValidation(files: WithOptDate[Array[HadoopFile]]): Boolean = {
val tryDate = files.date
Expand Down Expand Up @@ -594,7 +612,12 @@ object SparkContextUtils {
None
else {
val filtered = files.copy(value = files.value
.filter(excludePatternValidation).filter(endsWithValidation).filter(predicate))
.filter(excludePatternValidation)
.filter(endsWithValidation)
.filter(predicate)
.filter(excludeBlacklistedApikeys)
)

if (filtered.value.isEmpty || !dateValidation(filtered))
None
else
Expand Down Expand Up @@ -663,6 +686,8 @@ object SparkContextUtils {
val foundFiles = listAndFilterFiles(path, requireSuccess, inclusiveStartDate, startDate, inclusiveEndDate,
endDate, lastN, ignoreMalformedDates, endsWith, predicate = predicate)

logger.info(s"Found files ${foundFiles}")

if (foundFiles.size < minimumFiles)
throw new Exception(s"Tried with start/end time equals to $startDate/$endDate for path $path but but the resulting number of files $foundFiles is less than the required")

Expand Down
7 changes: 5 additions & 2 deletions tools/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ def get_assembly_path():
@arg('--detached', help='Run job in background, requires tmux')
@arg('--destroy-cluster', help='Will destroy cluster after finishing the job')
@arg('--extra', action='append', type=str, help='Additional arguments for the job in the format k=v')
@arg('--disable-propagate-aws-credentials', help='Setting this to true will not propagate your AWS credentials from your environment to the master')
@named('run')
def job_run(cluster_name, job_name, job_mem,
key_file=default_key_file, disable_tmux=False,
Expand All @@ -439,6 +440,7 @@ def job_run(cluster_name, job_name, job_mem,
region=default_region,
driver_heap_size=default_driver_heap_size,
remove_files=True,
disable_propagate_aws_credentials=False,
extra=[]):

utc_job_date_example = '2014-05-04T13:13:10Z'
Expand All @@ -456,14 +458,15 @@ def job_run(cluster_name, job_name, job_mem,
remote_hook = '{remote_path}/remote_hook.sh'.format(remote_path=remote_path)
notify_param = 'yes' if notify_on_errors else 'no'
yarn_param = 'yes' if yarn else 'no'
aws_vars = get_aws_keys_str() if not disable_propagate_aws_credentials else ''
job_date = utc_job_date or datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
job_tag = job_tag or job_date.replace(':', '_').replace('-', '_').replace('Z', 'UTC')
runner_extra_args = ' '.join('--runner-extra "%s"' % arg for arg in extra)
tmux_wait_command = ';(echo Press enter to keep the session open && /bin/bash -c "read -t 5" && sleep 7d)' if not detached else ''
tmux_arg = ". /etc/profile; . ~/.profile;tmux new-session {detached} -s spark.{job_name}.{job_tag} '{aws_vars} {remote_hook} {job_name} {job_date} {job_tag} {job_user} {remote_control_dir} {spark_mem} {yarn_param} {notify_param} {driver_heap_size} {runner_extra_args} {tmux_wait_command}' >& /tmp/commandoutput".format(
aws_vars=get_aws_keys_str(), job_name=job_name, job_date=job_date, job_tag=job_tag, job_user=job_user, remote_control_dir=remote_control_dir, remote_hook=remote_hook, spark_mem=job_mem, detached='-d' if detached else '', yarn_param=yarn_param, notify_param=notify_param, driver_heap_size=driver_heap_size, runner_extra_args=runner_extra_args, tmux_wait_command=tmux_wait_command)
aws_vars=aws_vars, job_name=job_name, job_date=job_date, job_tag=job_tag, job_user=job_user, remote_control_dir=remote_control_dir, remote_hook=remote_hook, spark_mem=job_mem, detached='-d' if detached else '', yarn_param=yarn_param, notify_param=notify_param, driver_heap_size=driver_heap_size, runner_extra_args=runner_extra_args, tmux_wait_command=tmux_wait_command)
non_tmux_arg = ". /etc/profile; . ~/.profile;{aws_vars} {remote_hook} {job_name} {job_date} {job_tag} {job_user} {remote_control_dir} {spark_mem} {yarn_param} {notify_param} {driver_heap_size} {runner_extra_args} >& /tmp/commandoutput".format(
aws_vars=get_aws_keys_str(), job_name=job_name, job_date=job_date, job_tag=job_tag, job_user=job_user, remote_control_dir=remote_control_dir, remote_hook=remote_hook, spark_mem=job_mem, yarn_param=yarn_param, notify_param=notify_param, driver_heap_size=driver_heap_size, runner_extra_args=runner_extra_args)
aws_vars=aws_vars, job_name=job_name, job_date=job_date, job_tag=job_tag, job_user=job_user, remote_control_dir=remote_control_dir, remote_hook=remote_hook, spark_mem=job_mem, yarn_param=yarn_param, notify_param=notify_param, driver_heap_size=driver_heap_size, runner_extra_args=runner_extra_args)


if not disable_assembly_build:
Expand Down