diff --git a/perfkitbenchmarker/dpb_service.py b/perfkitbenchmarker/dpb_service.py index d0d5ccfc5e..129c267fd0 100644 --- a/perfkitbenchmarker/dpb_service.py +++ b/perfkitbenchmarker/dpb_service.py @@ -49,45 +49,69 @@ import yaml flags.DEFINE_string( - 'static_dpb_service_instance', None, + 'static_dpb_service_instance', + None, 'If set, the name of the pre created dpb implementation,' - 'assumed to be ready.') + 'assumed to be ready.', +) flags.DEFINE_string('dpb_log_level', 'INFO', 'Manipulate service log level') -flags.DEFINE_string('dpb_job_jarfile', None, - 'Executable Jarfile containing workload implementation') -flags.DEFINE_string('dpb_job_classname', None, 'Classname of the job ' - 'implementation in the jar file') flags.DEFINE_string( - 'dpb_service_bucket', None, 'A bucket to use with the DPB ' + 'dpb_job_jarfile', + None, + 'Executable Jarfile containing workload implementation', +) +flags.DEFINE_string( + 'dpb_job_classname', + None, + 'Classname of the job implementation in the jar file', +) +flags.DEFINE_string( + 'dpb_service_bucket', + None, + 'A bucket to use with the DPB ' 'service. If none is provided one will be created by the ' 'benchmark and cleaned up afterwards unless you are using ' - 'a static instance.') -flags.DEFINE_string('dpb_service_zone', None, 'The zone for provisioning the ' - 'dpb_service instance.') + 'a static instance.', +) +flags.DEFINE_string( + 'dpb_service_zone', + None, + 'The zone for provisioning the dpb_service instance.', +) flags.DEFINE_list( - 'dpb_job_properties', [], 'A list of strings of the form ' - '"key=value" to be passed into DPB jobs.') + 'dpb_job_properties', + [], + 'A list of strings of the form "key=value" to be passed into DPB jobs.', +) flags.DEFINE_list( - 'dpb_cluster_properties', [], 'A list of strings of the form ' + 'dpb_cluster_properties', + [], + 'A list of strings of the form ' '"type:key=value" to be passed into DPB clusters. See ' - 'https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/cluster-properties.' + 'https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/cluster-properties.', ) flags.DEFINE_float( - 'dpb_job_poll_interval_secs', 5, + 'dpb_job_poll_interval_secs', + 5, 'Poll interval to check submitted job status in seconds. Only applies for ' 'DPB service implementations that do not support synchronous job ' 'submissions (i.e. not Dataproc).', - lower_bound=0, upper_bound=120) + lower_bound=0, + upper_bound=120, +) flags.DEFINE_string( - 'dpb_initialization_actions', None, + 'dpb_initialization_actions', + None, 'A comma separated list of Google Cloud Storage URIs of executables to run ' 'on each node in the DPB cluster. See ' - 'https://cloud.google.com/sdk/gcloud/reference/dataproc/clusters/create#--initialization-actions.' + 'https://cloud.google.com/sdk/gcloud/reference/dataproc/clusters/create#--initialization-actions.', ) flags.DEFINE_bool( - 'dpb_export_job_stats', False, + 'dpb_export_job_stats', + False, 'Exports job stats such as CPU usage and cost. Disabled by default and not ' - 'necessarily implemented on all services.') + 'necessarily implemented on all services.', +) flags.DEFINE_enum( 'dpb_job_type', None, @@ -103,20 +127,23 @@ 'The type of the job to be run on the backends.', ) _HARDWARE_HOURLY_COST = flags.DEFINE_float( - 'dpb_hardware_hourly_cost', None, + 'dpb_hardware_hourly_cost', + None, 'Hardware hourly USD cost of running the DPB cluster. Set it along with ' - '--dpb_service_premium_hourly_cost to publish cost estimate metrics.' + '--dpb_service_premium_hourly_cost to publish cost estimate metrics.', ) _SERVICE_PREMIUM_HOURLY_COST = flags.DEFINE_float( - 'dpb_service_premium_hourly_cost', None, + 'dpb_service_premium_hourly_cost', + None, 'Hardware hourly USD cost of running the DPB cluster. Set it along with ' - '--dpb_hardware_hourly_cost to publish cost estimate metrics.' + '--dpb_hardware_hourly_cost to publish cost estimate metrics.', ) _DYNAMIC_ALLOCATION = flags.DEFINE_bool( - 'dpb_dynamic_allocation', True, + 'dpb_dynamic_allocation', + True, 'True by default. Set it to False to disable dynamic allocation and assign ' 'all cluster executors to an incoming job. Setting this off is only ' - 'supported by Dataproc and EMR (non-serverless versions).' + 'supported by Dataproc and EMR (non-serverless versions).', ) @@ -125,17 +152,20 @@ class JobNotCompletedError(Exception): """Used to signal a job is still running.""" + pass class JobSubmissionError(errors.Benchmarks.RunError): """Thrown by all implementations if SubmitJob fails.""" + pass @dataclasses.dataclass class JobResult: """Data class for the timing of a successful DPB job.""" + # Service reported execution time run_time: float # Service reported pending time (0 if service does not report). @@ -213,19 +243,21 @@ def base_dir(self): return self.persistent_fs_prefix + self.bucket # pytype: disable=attribute-error # bind-properties @abc.abstractmethod - def SubmitJob(self, - jarfile: Optional[str] = None, - classname: Optional[str] = None, - pyspark_file: Optional[str] = None, - query_file: Optional[str] = None, - job_poll_interval: Optional[float] = None, - job_stdout_file: Optional[str] = None, - job_arguments: Optional[List[str]] = None, - job_files: Optional[List[str]] = None, - job_jars: Optional[List[str]] = None, - job_py_files: Optional[List[str]] = None, - job_type: Optional[str] = None, - properties: Optional[Dict[str, str]] = None) -> JobResult: + def SubmitJob( + self, + jarfile: Optional[str] = None, + classname: Optional[str] = None, + pyspark_file: Optional[str] = None, + query_file: Optional[str] = None, + job_poll_interval: Optional[float] = None, + job_stdout_file: Optional[str] = None, + job_arguments: Optional[List[str]] = None, + job_files: Optional[List[str]] = None, + job_jars: Optional[List[str]] = None, + job_py_files: Optional[List[str]] = None, + job_type: Optional[str] = None, + properties: Optional[Dict[str, str]] = None, + ) -> JobResult: """Submit a data processing job to the backend. Args: @@ -266,7 +298,8 @@ def _WaitForJob(self, job_id, timeout, poll_interval): timeout=timeout, poll_interval=poll_interval, fuzz=0, - retryable_exceptions=(JobNotCompletedError,)) + retryable_exceptions=(JobNotCompletedError,), + ) def Poll(): result = self._GetCompletedJob(job_id) if result is None: @@ -288,8 +321,9 @@ def _GetCompletedJob(self, job_id: str) -> Optional[JobResult]: Raises: JobSubmissionError if job fails. """ - raise NotImplementedError('You need to implement _GetCompletedJob if you ' - 'use _WaitForJob') + raise NotImplementedError( + 'You need to implement _GetCompletedJob if you use _WaitForJob' + ) def GetSparkSubmitCommand( self, @@ -303,7 +337,8 @@ def GetSparkSubmitCommand( job_type: Optional[str] = None, job_py_files: Optional[List[str]] = None, properties: Optional[Dict[str, str]] = None, - spark_submit_cmd: str = spark.SPARK_SUBMIT) -> List[str]: + spark_submit_cmd: str = spark.SPARK_SUBMIT, + ) -> List[str]: """Builds the command to run spark-submit on cluster.""" # TODO(pclay): support dpb_constants.SPARKSQL_JOB_TYPE if job_type not in [ @@ -334,10 +369,12 @@ def GetSparkSubmitCommand( cmd += job_arguments return cmd - def DistributedCopy(self, - source: str, - destination: str, - properties: Optional[Dict[str, str]] = None) -> JobResult: + def DistributedCopy( + self, + source: str, + destination: str, + properties: Optional[Dict[str, str]] = None, + ) -> JobResult: """Method to copy data using a distributed job on the cluster. Args: @@ -355,32 +392,26 @@ def DistributedCopy(self, classname='org.apache.hadoop.tools.DistCp', job_arguments=[source, destination], job_type=dpb_constants.HADOOP_JOB_TYPE, - properties=properties) + properties=properties, + ) def _InitializeMetadata(self) -> None: pretty_version = self.GetDpbVersion() self.metadata = { - 'dpb_service': - self.dpb_service_type, - 'dpb_version': - pretty_version, - 'dpb_service_version': - '{}_{}'.format(self.dpb_service_type, pretty_version), - 'dpb_cluster_id': - self.cluster_id, - 'dpb_cluster_shape': - self.spec.worker_group.vm_spec.machine_type, - 'dpb_cluster_size': - self.spec.worker_count, - 'dpb_hdfs_type': - self.GetHdfsType(), - 'dpb_disk_size': - self.spec.worker_group.disk_spec.disk_size, - 'dpb_service_zone': - self.dpb_service_zone, - 'dpb_job_properties': - ','.join('{}={}'.format(k, v) - for k, v in self.GetJobProperties().items()), + 'dpb_service': self.dpb_service_type, + 'dpb_version': pretty_version, + 'dpb_service_version': '{}_{}'.format( + self.dpb_service_type, pretty_version + ), + 'dpb_cluster_id': self.cluster_id, + 'dpb_cluster_shape': self.spec.worker_group.vm_spec.machine_type, + 'dpb_cluster_size': self.spec.worker_count, + 'dpb_hdfs_type': self.GetHdfsType(), + 'dpb_disk_size': self.spec.worker_group.disk_spec.disk_size, + 'dpb_service_zone': self.dpb_service_zone, + 'dpb_job_properties': ','.join( + '{}={}'.format(k, v) for k, v in self.GetJobProperties().items() + ), 'dpb_cluster_properties': ','.join(self.GetClusterProperties()), 'dpb_dynamic_allocation': _DYNAMIC_ALLOCATION.value, } @@ -467,7 +498,8 @@ def GetExecutionJar(self, job_category: str, job_type: str) -> str: if jar: return jar raise NotImplementedError( - f'No jar found for category {job_category} and type {job_type}.') + f'No jar found for category {job_category} and type {job_type}.' + ) def GetClusterCreateTime(self) -> Optional[float]: """Returns the cluster creation time. @@ -560,29 +592,23 @@ def GetSamples(self) -> list[sample.Sample]: # Cluster creation time as reported by the DPB service # (non-Serverless DPB services only). 'dpb_cluster_create_time': (self.GetClusterCreateTime(), 'seconds'), - # Cluster duration as computed by the underlying benchmark. # (non-Serverless DPB services only). 'dpb_cluster_duration': (self.GetClusterDuration(), 'seconds'), - # Cluster hardware cost computed from cluster duration and # hourly costs passed in flags (non-Serverless DPB services only). 'dpb_cluster_hardware_cost': (self.GetClusterHardwareCost(), '$'), - # Cluster DPB service premium cost computed from cluster duration and # hourly costs passed in flags (non-Serverless DPB services only). 'dpb_cluster_premium_cost': (self.GetClusterPremiumCost(), '$'), - # Cluster hardware cost computed from cluster duration and # hourly costs passed in flags (non-Serverless DPB services only). 'dpb_cluster_total_cost': (self.GetClusterCost(), '$'), - # Cluster hardware cost per hour as specified in PKB flags. 'dpb_cluster_hardware_hourly_cost': ( _HARDWARE_HOURLY_COST.value, '$/hour', ), - # DPB Service premium cost per hour as specified in PKB flags. 'dpb_cluster_premium_hourly_cost': ( _SERVICE_PREMIUM_HOURLY_COST.value, @@ -633,7 +659,8 @@ def __init__(self, dpb_service_spec): self.cloud = dpb_service_spec.worker_group.cloud if not self.dpb_service_zone: raise errors.Setup.InvalidSetupError( - 'dpb_service_zone must be provided, for provisioning.') + 'dpb_service_zone must be provided, for provisioning.' + ) if self.cloud == 'GCP': self.region = gcp_util.GetRegionFromZone(FLAGS.dpb_service_zone) self.storage_service = gcs.GoogleCloudStorageService() @@ -650,7 +677,8 @@ def __init__(self, dpb_service_spec): logging.warning( 'Cloud provider %s does not support object storage. ' 'Some benchmarks will not work.', - self.cloud) + self.cloud, + ) if self.storage_service: self.storage_service.PrepareService(location=self.region) @@ -677,21 +705,26 @@ def GetClusterCreateTime(self) -> Optional[float]: for vm in vm_group: vms.append(vm) first_vm_create_start_time = min( - (vm.create_start_time - for vm in vms - if vm.create_start_time is not None), + ( + vm.create_start_time + for vm in vms + if vm.create_start_time is not None + ), default=None, ) last_vm_ready_start_time = max( - (vm.resource_ready_time - for vm in vms - if vm.resource_ready_time is not None), + ( + vm.resource_ready_time + for vm in vms + if vm.resource_ready_time is not None + ), default=None, ) if first_vm_create_start_time is None or last_vm_ready_start_time is None: return None - return (my_create_time + last_vm_ready_start_time - - first_vm_create_start_time) + return ( + my_create_time + last_vm_ready_start_time - first_vm_create_start_time + ) class UnmanagedDpbServiceYarnCluster(UnmanagedDpbService): @@ -719,27 +752,31 @@ def InstallHadoop(vm): if 'worker_group' not in self.vms: raise errors.Resource.CreationError( - 'UnmanagedDpbServiceYarnCluster requires VMs in a worker_group.') + 'UnmanagedDpbServiceYarnCluster requires VMs in a worker_group.' + ) background_tasks.RunThreaded( InstallHadoop, self.vms['worker_group'] + self.vms['master_group'] ) self.leader = self.vms['master_group'][0] hadoop.ConfigureAndStart( - self.leader, self.vms['worker_group'], configure_s3=self.cloud == 'AWS') - - def SubmitJob(self, - jarfile=None, - classname=None, - pyspark_file=None, - query_file=None, - job_poll_interval=None, - job_stdout_file=None, - job_arguments=None, - job_files=None, - job_jars=None, - job_py_files=None, - job_type=None, - properties=None): + self.leader, self.vms['worker_group'], configure_s3=self.cloud == 'AWS' + ) + + def SubmitJob( + self, + jarfile=None, + classname=None, + pyspark_file=None, + query_file=None, + job_poll_interval=None, + job_stdout_file=None, + job_arguments=None, + job_files=None, + job_jars=None, + job_py_files=None, + job_type=None, + properties=None, + ): """Submit a data processing job to the backend.""" if job_type != dpb_constants.HADOOP_JOB_TYPE: raise NotImplementedError @@ -811,28 +848,32 @@ def InstallSpark(vm): if 'worker_group' not in self.vms: raise errors.Resource.CreationError( - 'UnmanagedDpbSparkCluster requires VMs in a worker_group.') + 'UnmanagedDpbSparkCluster requires VMs in a worker_group.' + ) background_tasks.RunThreaded( InstallSpark, self.vms['worker_group'] + self.vms['master_group'] ) self.leader = self.vms['master_group'][0] spark.ConfigureAndStart( - self.leader, self.vms['worker_group'], configure_s3=self.cloud == 'AWS') - - def SubmitJob(self, - jarfile=None, - classname=None, - pyspark_file=None, - query_file=None, - job_poll_interval=None, - job_stdout_file=None, - job_arguments=None, - job_files=None, - job_jars=None, - job_py_files=None, - job_type=None, - properties=None): + self.leader, self.vms['worker_group'], configure_s3=self.cloud == 'AWS' + ) + + def SubmitJob( + self, + jarfile=None, + classname=None, + pyspark_file=None, + query_file=None, + job_poll_interval=None, + job_stdout_file=None, + job_arguments=None, + job_files=None, + job_jars=None, + job_py_files=None, + job_type=None, + properties=None, + ): """Submit a data processing job to the backend.""" cmd = self.GetSparkSubmitCommand( jarfile=jarfile, @@ -843,7 +884,8 @@ def SubmitJob(self, job_jars=job_jars, job_py_files=job_py_files, job_type=job_type, - properties=properties) + properties=properties, + ) start_time = datetime.datetime.now() try: stdout, _ = self.leader.RobustRemoteCommand(' '.join(cmd)) @@ -911,7 +953,8 @@ def __init__(self, dpb_service_spec): self.persistent_fs_prefix = 's3://' else: raise errors.Config.InvalidValue( - f'Unsupported Cloud provider {self.cloud}') + f'Unsupported Cloud provider {self.cloud}' + ) self.storage_service.PrepareService(location=self.region) @@ -942,7 +985,8 @@ def _Create(self): # https://spark.apache.org/docs/latest/running-on-kubernetes.html#rbac # TODO(pclay): Consider moving into manifest self.k8s_cluster.CreateServiceAccount( - self.SPARK_K8S_SERVICE_ACCOUNT, clusterrole='edit') + self.SPARK_K8S_SERVICE_ACCOUNT, clusterrole='edit' + ) def _GetDriverName(self): return f'spark-driver-{len(self.spark_drivers)}' @@ -958,11 +1002,12 @@ def GetJobProperties(self) -> Dict[str, str]: node_cpu = self.k8s_cluster.node_num_cpu # TODO(pclay): Validate that we don't have too little memory? node_memory_mb = self.k8s_cluster.node_memory_allocatable.m_as( - units.mebibyte) + units.mebibyte + ) # Reserve 512 MB for system daemons node_memory_mb -= 512 # Remove overhead - node_memory_mb /= (1 + self.MEMORY_OVERHEAD_FACTOR) + node_memory_mb /= 1 + self.MEMORY_OVERHEAD_FACTOR node_memory_mb = int(node_memory_mb) # Common PKB Spark cluster properties @@ -973,43 +1018,41 @@ def GetJobProperties(self) -> Dict[str, str]: worker_cores=node_cpu, # Reserve one node for driver num_workers=self.k8s_cluster.num_nodes - 1, - configure_s3=self.cloud == 'AWS') + configure_s3=self.cloud == 'AWS', + ) # k8s specific properties properties.update({ - 'spark.driver.host': - self.SPARK_DRIVER_SERVICE, - 'spark.driver.port': - str(self.SPARK_DRIVER_PORT), - 'spark.kubernetes.driver.pod.name': - self._GetDriverName(), + 'spark.driver.host': self.SPARK_DRIVER_SERVICE, + 'spark.driver.port': str(self.SPARK_DRIVER_PORT), + 'spark.kubernetes.driver.pod.name': self._GetDriverName(), # Tell Spark to under-report cores by 1 to fit next to k8s services - 'spark.kubernetes.executor.request.cores': - str(node_cpu - 1), - 'spark.kubernetes.container.image': - self.image, + 'spark.kubernetes.executor.request.cores': str(node_cpu - 1), + 'spark.kubernetes.container.image': self.image, # No HDFS available - 'spark.hadoop.fs.defaultFS': - self.base_dir, - 'spark.kubernetes.memoryOverheadFactor': - str(self.MEMORY_OVERHEAD_FACTOR), + 'spark.hadoop.fs.defaultFS': self.base_dir, + 'spark.kubernetes.memoryOverheadFactor': str( + self.MEMORY_OVERHEAD_FACTOR + ), }) # User specified properties properties.update(super().GetJobProperties()) return properties - def SubmitJob(self, - jarfile=None, - classname=None, - pyspark_file=None, - query_file=None, - job_poll_interval=None, - job_stdout_file=None, - job_arguments=None, - job_files=None, - job_jars=None, - job_py_files=None, - job_type=None, - properties=None): + def SubmitJob( + self, + jarfile=None, + classname=None, + pyspark_file=None, + query_file=None, + job_poll_interval=None, + job_stdout_file=None, + job_arguments=None, + job_files=None, + job_jars=None, + job_py_files=None, + job_type=None, + properties=None, + ): """Submit a data processing job to the backend.""" # Specs can't be copied or created by hand. So we override the command of # the spec for each job. @@ -1022,12 +1065,14 @@ def SubmitJob(self, job_jars=job_jars, job_py_files=job_py_files, job_type=job_type, - properties=properties) + properties=properties, + ) driver_name = self._GetDriverName() # Request memory for driver. This should guarantee that driver does not get # scheduled on same VM as exectutor and OOM. driver_memory_mb = int( - self.GetJobProperties()[spark.SPARK_DRIVER_MEMORY].strip('m')) + self.GetJobProperties()[spark.SPARK_DRIVER_MEMORY].strip('m') + ) start_time = datetime.datetime.now() self.k8s_cluster.ApplyManifest( 'container/spark/spark-driver.yaml.j2', @@ -1037,7 +1082,8 @@ def SubmitJob(self, driver_port=self.SPARK_DRIVER_PORT, driver_service=self.SPARK_DRIVER_SERVICE, image=self.image, - service_account=self.SPARK_K8S_SERVICE_ACCOUNT) + service_account=self.SPARK_K8S_SERVICE_ACCOUNT, + ) container = container_service.KubernetesPod(driver_name) # increments driver_name for next job self.spark_drivers.append(container) @@ -1091,7 +1137,8 @@ def __init__(self, dpb_service_spec): self.persistent_fs_prefix = 'gs://' else: raise errors.Config.InvalidValue( - f'Unsupported Cloud provider {self.cloud}') + f'Unsupported Cloud provider {self.cloud}' + ) self.storage_service.PrepareService(location=self.region) @@ -1165,7 +1212,8 @@ def _GetJobManagerName(self): def GetJobProperties(self) -> Dict[str, str]: node_cpu = self.k8s_cluster.node_num_cpu node_memory_mb = self.k8s_cluster.node_memory_allocatable.m_as( - units.mebibyte) + units.mebibyte + ) # Reserve 512 MB for system daemons node_memory_mb -= 512 node_memory_mb = int(node_memory_mb) diff --git a/perfkitbenchmarker/providers/aws/aws_dpb_emr.py b/perfkitbenchmarker/providers/aws/aws_dpb_emr.py index 811f5ae241..0e7f11accd 100644 --- a/perfkitbenchmarker/providers/aws/aws_dpb_emr.py +++ b/perfkitbenchmarker/providers/aws/aws_dpb_emr.py @@ -36,8 +36,9 @@ from perfkitbenchmarker.providers.aws import util FLAGS = flags.FLAGS -flags.DEFINE_string('dpb_emr_release_label', None, - 'DEPRECATED use dpb_service.version.') +flags.DEFINE_string( + 'dpb_emr_release_label', None, 'DEPRECATED use dpb_service.version.' +) INVALID_STATES = ['TERMINATED_WITH_ERRORS', 'TERMINATED'] READY_CHECK_SLEEP = 30 @@ -69,8 +70,9 @@ def _GetClusterConfiguration(cluster_properties: list[str]) -> str: key, value = kv.split('=') if file not in DATAPROC_TO_EMR_CONF_FILES: raise errors.Config.InvalidValue( - 'Unsupported EMR configuration file "{}". '.format(file) + - 'Please add it to aws_dpb_emr.DATAPROC_TO_EMR_CONF_FILES.') + 'Unsupported EMR configuration file "{}". '.format(file) + + 'Please add it to aws_dpb_emr.DATAPROC_TO_EMR_CONF_FILES.' + ) properties[DATAPROC_TO_EMR_CONF_FILES[file]][key] = value json_conf = [] for file, props in properties.items(): @@ -112,10 +114,12 @@ def __init__(self, dpb_service_spec): self.region = util.GetRegionFromZone(self.dpb_service_zone) else: raise errors.Setup.InvalidSetupError( - 'dpb_service_zone must be provided, for provisioning.') + 'dpb_service_zone must be provided, for provisioning.' + ) self.cmd_prefix += ['--region', self.region] self.network = aws_network.AwsNetwork.GetNetworkFromNetworkSpec( - aws_network.AwsNetworkSpec(zone=self.dpb_service_zone)) + aws_network.AwsNetworkSpec(zone=self.dpb_service_zone) + ) self.storage_service = s3.S3Service() self.storage_service.PrepareService(self.region) self.persistent_fs_prefix = 's3://' @@ -125,7 +129,8 @@ def __init__(self, dpb_service_spec): self._cluster_delete_time: Optional[float] = None if not self.GetDpbVersion(): raise errors.Setup.InvalidSetupError( - 'dpb_service.version must be provided.') + 'dpb_service.version must be provided.' + ) def GetDpbVersion(self) -> Optional[str]: return FLAGS.dpb_emr_release_label or super().GetDpbVersion() @@ -168,25 +173,31 @@ def _Create(self): assert self.spec.worker_group.disk_spec.disk_number is None assert self.spec.worker_group.disk_spec.iops is None if self.spec.worker_group.disk_spec.disk_type != disk.LOCAL: - ebs_configuration = {'EbsBlockDeviceConfigs': [ - {'VolumeSpecification': { - 'SizeInGB': self.spec.worker_group.disk_spec.disk_size, - 'VolumeType': self.spec.worker_group.disk_spec.disk_type}, - 'VolumesPerInstance': self.spec.worker_group.disk_count}]} + ebs_configuration = { + 'EbsBlockDeviceConfigs': [{ + 'VolumeSpecification': { + 'SizeInGB': self.spec.worker_group.disk_spec.disk_size, + 'VolumeType': self.spec.worker_group.disk_spec.disk_type, + }, + 'VolumesPerInstance': self.spec.worker_group.disk_count, + }] + } # Create the specification for the master and the worker nodes instance_groups = [] - core_instances = {'InstanceCount': self.spec.worker_count, - 'InstanceGroupType': 'CORE', - 'InstanceType': - self.spec.worker_group.vm_spec.machine_type} + core_instances = { + 'InstanceCount': self.spec.worker_count, + 'InstanceGroupType': 'CORE', + 'InstanceType': self.spec.worker_group.vm_spec.machine_type, + } if ebs_configuration: core_instances.update({'EbsConfiguration': ebs_configuration}) - master_instance = {'InstanceCount': 1, - 'InstanceGroupType': 'MASTER', - 'InstanceType': - self.spec.worker_group.vm_spec.machine_type} + master_instance = { + 'InstanceCount': 1, + 'InstanceGroupType': 'MASTER', + 'InstanceType': self.spec.worker_group.vm_spec.machine_type, + } if ebs_configuration: master_instance.update({'EbsConfiguration': ebs_configuration}) @@ -194,14 +205,23 @@ def _Create(self): instance_groups.append(master_instance) # Spark SQL needs to access Hive - cmd = self.cmd_prefix + ['emr', 'create-cluster', '--name', name, - '--release-label', self.GetDpbVersion(), - '--use-default-roles', - '--instance-groups', - json.dumps(instance_groups), - '--application', 'Name=Spark', - 'Name=Hadoop', 'Name=Hive', - '--log-uri', self.base_dir] + cmd = self.cmd_prefix + [ + 'emr', + 'create-cluster', + '--name', + name, + '--release-label', + self.GetDpbVersion(), + '--use-default-roles', + '--instance-groups', + json.dumps(instance_groups), + '--application', + 'Name=Spark', + 'Name=Hadoop', + 'Name=Hive', + '--log-uri', + self.base_dir, + ] ec2_attributes = [ 'KeyName=' + aws_virtual_machine.AwsKeyFileManager.GetKeyNameForRun(), @@ -227,24 +247,29 @@ def _Create(self): def _AddTags(self, tags_dict: dict[str, str]): tag_args = [f'{key}={value}' for key, value in tags_dict.items()] - cmd = self.cmd_prefix + ['emr', 'add-tags', - '--resource-id', self.cluster_id, - '--tags'] + tag_args + cmd = ( + self.cmd_prefix + + ['emr', 'add-tags', '--resource-id', self.cluster_id, '--tags'] + + tag_args + ) try: vm_util.IssueCommand(cmd) except errors.VmUtil.IssueCommandError as e: error_message = str(e) if 'ThrottlingException' in error_message: raise errors.Benchmarks.QuotaFailure.RateLimitExceededError( - error_message) from e + error_message + ) from e raise def _Delete(self): if self.cluster_id: - delete_cmd = self.cmd_prefix + ['emr', - 'terminate-clusters', - '--cluster-ids', - self.cluster_id] + delete_cmd = self.cmd_prefix + [ + 'emr', + 'terminate-clusters', + '--cluster-ids', + self.cluster_id, + ] vm_util.IssueCommand(delete_cmd, raise_on_failure=False) def _DeleteDependencies(self): @@ -255,10 +280,12 @@ def _Exists(self): """Check to see whether the cluster exists.""" if not self.cluster_id: return False - cmd = self.cmd_prefix + ['emr', - 'describe-cluster', - '--cluster-id', - self.cluster_id] + cmd = self.cmd_prefix + [ + 'emr', + 'describe-cluster', + '--cluster-id', + self.cluster_id, + ] stdout, _, retcode = vm_util.IssueCommand(cmd, raise_on_failure=False) if retcode != 0: return False @@ -279,9 +306,12 @@ def _Exists(self): def _IsReady(self): """Check to see if the cluster is ready.""" logging.info('Checking _Ready cluster: %s', self.cluster_id) - cmd = self.cmd_prefix + ['emr', - 'describe-cluster', '--cluster-id', - self.cluster_id] + cmd = self.cmd_prefix + [ + 'emr', + 'describe-cluster', + '--cluster-id', + self.cluster_id, + ] stdout, _, _ = vm_util.IssueCommand(cmd) result = json.loads(stdout) # TODO(saksena): Handle error outcomees when spinning up emr clusters @@ -307,44 +337,53 @@ def _ParseClusterCreateTime( def _GetCompletedJob(self, job_id): """See base class.""" cmd = self.cmd_prefix + [ - 'emr', 'describe-step', '--cluster-id', self.cluster_id, '--step-id', - job_id + 'emr', + 'describe-step', + '--cluster-id', + self.cluster_id, + '--step-id', + job_id, ] stdout, stderr, retcode = vm_util.IssueCommand(cmd, raise_on_failure=False) if retcode: if 'ThrottlingException' in stderr: - logging.warning('Rate limited while polling EMR step:\n%s\nRetrying.', - stderr) + logging.warning( + 'Rate limited while polling EMR step:\n%s\nRetrying.', stderr + ) return None else: raise errors.VmUtil.IssueCommandError( - f'Getting step status failed:\n{stderr}') + f'Getting step status failed:\n{stderr}' + ) result = json.loads(stdout) state = result['Step']['Status']['State'] if state == 'FAILED': raise dpb_service.JobSubmissionError( - result['Step']['Status']['FailureDetails']) + result['Step']['Status']['FailureDetails'] + ) if state == 'COMPLETED': pending_time = result['Step']['Status']['Timeline']['CreationDateTime'] start_time = result['Step']['Status']['Timeline']['StartDateTime'] end_time = result['Step']['Status']['Timeline']['EndDateTime'] return dpb_service.JobResult( - run_time=end_time - start_time, - pending_time=start_time - pending_time) - - def SubmitJob(self, - jarfile=None, - classname=None, - pyspark_file=None, - query_file=None, - job_poll_interval=None, - job_arguments=None, - job_files=None, - job_jars=None, - job_py_files=None, - job_stdout_file=None, - job_type=None, - properties=None): + run_time=end_time - start_time, pending_time=start_time - pending_time + ) + + def SubmitJob( + self, + jarfile=None, + classname=None, + pyspark_file=None, + query_file=None, + job_poll_interval=None, + job_arguments=None, + job_files=None, + job_jars=None, + job_py_files=None, + job_stdout_file=None, + job_type=None, + properties=None, + ): """See base class.""" if job_arguments: # Escape commas in arguments @@ -413,12 +452,14 @@ def SubmitJob(self, step_string = ','.join(step_list) - step_cmd = self.cmd_prefix + ['emr', - 'add-steps', - '--cluster-id', - self.cluster_id, - '--steps', - step_string] + step_cmd = self.cmd_prefix + [ + 'emr', + 'add-steps', + '--cluster-id', + self.cluster_id, + '--steps', + step_string, + ] stdout, _, _ = vm_util.IssueCommand(step_cmd) result = json.loads(stdout) step_id = result['StepIds'][0] @@ -432,7 +473,8 @@ def DistributedCopy(self, source, destination, properties=None): return self.SubmitJob( 'command-runner.jar', job_arguments=job_arguments, - job_type=dpb_constants.HADOOP_JOB_TYPE) + job_type=dpb_constants.HADOOP_JOB_TYPE, + ) def GetHdfsType(self) -> Optional[str]: """Gets human friendly disk type for metric metadata.""" @@ -469,7 +511,8 @@ def __init__(self, dpb_service_spec): self.region = util.GetRegionFromZone(self.dpb_service_zone) else: raise errors.Setup.InvalidSetupError( - 'dpb_service_zone must be provided, for provisioning.') + 'dpb_service_zone must be provided, for provisioning.' + ) self.cmd_prefix += ['--region', self.region] self.storage_service = s3.S3Service() self.storage_service.PrepareService(self.region) @@ -480,26 +523,29 @@ def __init__(self, dpb_service_spec): 'dpb_service.version must be provided. Versions follow the format: ' '"emr-x.y.z" and are listed at ' 'https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/' - 'release-versions.html') + 'release-versions.html' + ) self.role = FLAGS.aws_emr_serverless_role # Last job run cost self._run_cost = None self._FillMetadata() - def SubmitJob(self, - jarfile=None, - classname=None, - pyspark_file=None, - query_file=None, - job_poll_interval=None, - job_arguments=None, - job_files=None, - job_jars=None, - job_py_files=None, - job_stdout_file=None, - job_type=None, - properties=None): + def SubmitJob( + self, + jarfile=None, + classname=None, + pyspark_file=None, + query_file=None, + job_poll_interval=None, + job_arguments=None, + job_files=None, + job_jars=None, + job_py_files=None, + job_stdout_file=None, + job_type=None, + properties=None, + ): """See base class.""" assert job_type @@ -515,49 +561,69 @@ def SubmitJob(self, 'entryPoint': pyspark_file, 'entryPointArguments': job_arguments, 'sparkSubmitParameters': ' '.join( - f'--conf {prop}={val}' for prop, val in spark_props.items()) + f'--conf {prop}={val}' for prop, val in spark_props.items() + ), } } else: raise NotImplementedError( - f'Unsupported job type {job_type} for AWS EMR Serverless.') + f'Unsupported job type {job_type} for AWS EMR Serverless.' + ) # Create the application. - stdout, _, _ = vm_util.IssueCommand(self.cmd_prefix + [ - 'emr-serverless', 'create-application', - '--release-label', self.GetDpbVersion(), - '--name', self.cluster_id, - '--type', application_type, - '--tags', json.dumps(util.MakeDefaultTags()), - ]) + stdout, _, _ = vm_util.IssueCommand( + self.cmd_prefix + + [ + 'emr-serverless', + 'create-application', + '--release-label', + self.GetDpbVersion(), + '--name', + self.cluster_id, + '--type', + application_type, + '--tags', + json.dumps(util.MakeDefaultTags()), + ] + ) result = json.loads(stdout) application_id = result['applicationId'] @vm_util.Retry( poll_interval=job_poll_interval, fuzz=0, - retryable_exceptions=(EMRRetryableException,)) + retryable_exceptions=(EMRRetryableException,), + ) def WaitTilApplicationReady(): result = self._GetApplication(application_id) if result['application']['state'] not in ('CREATED', 'STARTED'): raise EMRRetryableException( - f'Application {application_id} not ready yet.') + f'Application {application_id} not ready yet.' + ) return result WaitTilApplicationReady() # Run the job. - stdout, _, _ = vm_util.IssueCommand(self.cmd_prefix + [ - 'emr-serverless', 'start-job-run', - '--application-id', application_id, - '--execution-role-arn', self.role, - '--job-driver', json.dumps(job_driver_dict), - ]) + stdout, _, _ = vm_util.IssueCommand( + self.cmd_prefix + + [ + 'emr-serverless', + 'start-job-run', + '--application-id', + application_id, + '--execution-role-arn', + self.role, + '--job-driver', + json.dumps(job_driver_dict), + ] + ) result = json.loads(stdout) application_id = result['applicationId'] job_run_id = result['jobRunId'] return self._WaitForJob( - (application_id, job_run_id), EMR_TIMEOUT, job_poll_interval) + (application_id, job_run_id), EMR_TIMEOUT, job_poll_interval + ) def CalculateLastJobCost(self) -> Optional[float]: return self._run_cost @@ -571,7 +637,8 @@ def GetJobProperties(self) -> Dict[str, str]: result['spark.executor.memory'] = f'{self.spec.emr_serverless_memory}G' if self.spec.emr_serverless_executor_count: result['spark.executor.instances'] = ( - self.spec.emr_serverless_executor_count) + self.spec.emr_serverless_executor_count + ) if self.spec.worker_group.disk_spec.disk_size: result['spark.emr-serverless.driver.disk'] = ( f'{self.spec.worker_group.disk_spec.disk_size}G' @@ -584,29 +651,37 @@ def GetJobProperties(self) -> Dict[str, str]: def _GetApplication(self, application_id): stdout, _, _ = vm_util.IssueCommand( - self.cmd_prefix + - ['emr-serverless', 'get-application', - '--application-id', application_id]) + self.cmd_prefix + + [ + 'emr-serverless', + 'get-application', + '--application-id', + application_id, + ] + ) result = json.loads(stdout) return result def _ComputeJobRunCost( - self, - memory_gb_hour: float, - storage_gb_hour: float, - vcpu_hour: float) -> Optional[float]: + self, memory_gb_hour: float, storage_gb_hour: float, vcpu_hour: float + ) -> Optional[float]: region_prices = aws_dpb_emr_serverless_prices.EMR_SERVERLESS_PRICES.get( - self.region, {}) + self.region, {} + ) memory_gb_hour_price = region_prices.get('memory_gb_hours') storage_gb_hour_price = region_prices.get('storage_gb_hours') vcpu_hour_price = region_prices.get('vcpu_hours') - if (memory_gb_hour_price is None or storage_gb_hour_price is None or - vcpu_hour_price is None): + if ( + memory_gb_hour_price is None + or storage_gb_hour_price is None + or vcpu_hour_price is None + ): return None return ( - memory_gb_hour * memory_gb_hour_price + - storage_gb_hour * storage_gb_hour_price + - vcpu_hour * vcpu_hour_price) + memory_gb_hour * memory_gb_hour_price + + storage_gb_hour * storage_gb_hour_price + + vcpu_hour * vcpu_hour_price + ) def _GetCompletedJob(self, job_id): """See base class.""" @@ -614,17 +689,21 @@ def _GetCompletedJob(self, job_id): cmd = self.cmd_prefix + [ 'emr-serverless', 'get-job-run', - '--application-id', application_id, - '--job-run-id', job_run_id + '--application-id', + application_id, + '--job-run-id', + job_run_id, ] stdout, stderr, retcode = vm_util.IssueCommand(cmd, raise_on_failure=False) if retcode: if 'ThrottlingException' in stderr: - logging.warning('Rate limited while polling EMR JobRun:\n%s\nRetrying.', - stderr) + logging.warning( + 'Rate limited while polling EMR JobRun:\n%s\nRetrying.', stderr + ) return None raise errors.VmUtil.IssueCommandError( - f'Getting JobRun status failed:\n{stderr}') + f'Getting JobRun status failed:\n{stderr}' + ) result = json.loads(stdout) state = result['jobRun']['state'] if state in ('FAILED', 'CANCELLED'): @@ -632,14 +711,16 @@ def _GetCompletedJob(self, job_id): if state == 'SUCCESS': start_time = result['jobRun']['createdAt'] end_time = result['jobRun']['updatedAt'] - resource_utilization = ( - result.get('jobRun', {}).get('totalResourceUtilization', {})) + resource_utilization = result.get('jobRun', {}).get( + 'totalResourceUtilization', {} + ) memory_gb_hour = resource_utilization.get('memoryGBHour') storage_gb_hour = resource_utilization.get('storageGBHour') vcpu_hour = resource_utilization.get('vCPUHour') if None not in (memory_gb_hour, storage_gb_hour, vcpu_hour): self._run_cost = self._ComputeJobRunCost( - memory_gb_hour, storage_gb_hour, vcpu_hour) + memory_gb_hour, storage_gb_hour, vcpu_hour + ) return dpb_service.JobResult(run_time=end_time - start_time) def _FillMetadata(self) -> None: diff --git a/perfkitbenchmarker/providers/aws/aws_dpb_glue.py b/perfkitbenchmarker/providers/aws/aws_dpb_glue.py index 3da490d30e..4f358af059 100644 --- a/perfkitbenchmarker/providers/aws/aws_dpb_glue.py +++ b/perfkitbenchmarker/providers/aws/aws_dpb_glue.py @@ -12,7 +12,6 @@ from typing import List, Optional from absl import flags - from perfkitbenchmarker import dpb_constants from perfkitbenchmarker import dpb_service from perfkitbenchmarker import errors @@ -48,7 +47,8 @@ def __init__(self, dpb_service_spec): self.cmd_prefix = list(util.AWS_PREFIX) if not self.dpb_service_zone: raise errors.Setup.InvalidSetupError( - 'dpb_service_zone must be provided, for provisioning.') + 'dpb_service_zone must be provided, for provisioning.' + ) self.region = util.GetRegionFromZone(self.dpb_service_zone) self.cmd_prefix += ['--region', self.region] self.storage_service = s3.S3Service() @@ -72,13 +72,16 @@ def _GetCompletedJob(self, job_id): cmd = self.cmd_prefix + [ 'glue', 'get-job-run', - '--job-name', job_name, - '--run-id', job_run_id + '--job-name', + job_name, + '--run-id', + job_run_id, ] stdout, stderr, retcode = vm_util.IssueCommand(cmd, raise_on_failure=False) if retcode: raise errors.VmUtil.IssueCommandError( - f'Getting step status failed:\n{stderr}') + f'Getting step status failed:\n{stderr}' + ) result = json.loads(stdout) state = result['JobRun']['JobRunState'] if state in ('FAILED', 'ERROR', 'TIMEOUT'): @@ -94,21 +97,24 @@ def _GetCompletedJob(self, job_id): ) return dpb_service.JobResult( run_time=execution_time, - pending_time=completed_on - started_on - execution_time) - - def SubmitJob(self, - jarfile=None, - classname=None, - pyspark_file=None, - query_file=None, - job_poll_interval=None, - job_stdout_file=None, - job_arguments=None, - job_files=None, - job_jars=None, - job_py_files=None, - job_type=None, - properties=None): + pending_time=completed_on - started_on - execution_time, + ) + + def SubmitJob( + self, + jarfile=None, + classname=None, + pyspark_file=None, + query_file=None, + job_poll_interval=None, + job_stdout_file=None, + job_arguments=None, + job_files=None, + job_jars=None, + job_py_files=None, + job_type=None, + properties=None, + ): """See base class.""" assert job_type @@ -130,34 +136,53 @@ def SubmitJob(self, if properties: all_properties.update(properties) glue_default_args = { - '--extra-py-files': ','.join(extra_py_files), **all_properties} + '--extra-py-files': ','.join(extra_py_files), + **all_properties, + } else: raise ValueError(f'Unsupported job type {job_type} for AWS Glue.') - vm_util.IssueCommand(self.cmd_prefix + [ - 'glue', - 'create-job', - '--name', job_name, - '--role', self.role, - '--command', json.dumps(glue_command), - '--default-arguments', json.dumps(glue_default_args), - '--glue-version', self.GetDpbVersion(), - '--number-of-workers', str(self.spec.worker_count), - '--worker-type', self.spec.worker_group.vm_spec.machine_type, - - ]) + vm_util.IssueCommand( + self.cmd_prefix + + [ + 'glue', + 'create-job', + '--name', + job_name, + '--role', + self.role, + '--command', + json.dumps(glue_command), + '--default-arguments', + json.dumps(glue_default_args), + '--glue-version', + self.GetDpbVersion(), + '--number-of-workers', + str(self.spec.worker_count), + '--worker-type', + self.spec.worker_group.vm_spec.machine_type, + ] + ) # Run job definition - stdout, _, _ = vm_util.IssueCommand(self.cmd_prefix + [ - 'glue', - 'start-job-run', - '--job-name', job_name, - '--arguments', json.dumps( - {'--pkb_main': _ModuleFromPyFilename(pyspark_file), - '--pkb_args': json.dumps(job_arguments)})]) + stdout, _, _ = vm_util.IssueCommand( + self.cmd_prefix + + [ + 'glue', + 'start-job-run', + '--job-name', + job_name, + '--arguments', + json.dumps({ + '--pkb_main': _ModuleFromPyFilename(pyspark_file), + '--pkb_args': json.dumps(job_arguments), + }), + ] + ) job_run_id = json.loads(stdout)['JobRunId'] - return self._WaitForJob((job_name, job_run_id), GLUE_TIMEOUT, - job_poll_interval) + return self._WaitForJob( + (job_name, job_run_id), GLUE_TIMEOUT, job_poll_interval + ) def _Delete(self): """Deletes Glue Jobs created to avoid quota issues.""" @@ -166,11 +191,10 @@ def _Delete(self): self._DeleteGlueJob(job_name) def _DeleteGlueJob(self, job_name: str): - vm_util.IssueCommand(self.cmd_prefix + [ - 'glue', - 'delete-job', - f'--job-name={job_name}' - ], raise_on_failure=False) + vm_util.IssueCommand( + self.cmd_prefix + ['glue', 'delete-job', f'--job-name={job_name}'], + raise_on_failure=False, + ) def _FillMetadata(self) -> None: """Gets a dict to initialize this DPB service instance's metadata.""" @@ -180,7 +204,8 @@ def _FillMetadata(self) -> None: # https://docs.aws.amazon.com/glue/latest/dg/add-job.html#:~:text=Own%20Custom%20Scripts.-,Worker%20type,-The%20following%20worker disk_size_by_worker_type = {'Standard': '50', 'G.1X': '64', 'G.2X': '128'} dpb_disk_size = disk_size_by_worker_type.get( - self.spec.worker_group.vm_spec.machine_type, 'Unknown') + self.spec.worker_group.vm_spec.machine_type, 'Unknown' + ) self.metadata = { 'dpb_service': basic_data['dpb_service'], diff --git a/tests/providers/aws/aws_dpb_glue_test.py b/tests/providers/aws/aws_dpb_glue_test.py index a96bf92b20..8890731339 100644 --- a/tests/providers/aws/aws_dpb_glue_test.py +++ b/tests/providers/aws/aws_dpb_glue_test.py @@ -20,7 +20,6 @@ from unittest import mock from absl import flags - from perfkitbenchmarker import dpb_constants from perfkitbenchmarker import vm_util from perfkitbenchmarker.providers.aws import aws_dpb_glue @@ -39,10 +38,7 @@ 'LastModifiedOn': 1675105738.096, 'CompletedOn': 1675105738.096, 'JobRunState': 'SUCCEEDED', - 'Arguments': { - '--pkb_main': 'hello', - '--pkb_args': '[]' - }, + 'Arguments': {'--pkb_main': 'hello', '--pkb_args': '[]'}, 'PredecessorRuns': [], 'AllocatedCapacity': 32, 'ExecutionTime': 2672, @@ -51,7 +47,7 @@ 'WorkerType': 'G.2X', 'NumberOfWorkers': 4, 'LogGroupName': '/aws-glue/jobs', - 'GlueVersion': '3.0' + 'GlueVersion': '3.0', } } @@ -59,7 +55,7 @@ def _GetJobRunMockPayload( dpu_seconds: Optional[float], max_capacity: Optional[float], - execution_time: Optional[float] + execution_time: Optional[float], ) -> dict[str, Any]: payload = copy.deepcopy(_BASE_JOB_RUN_PAYLOAD) if dpu_seconds is not None: @@ -76,7 +72,8 @@ def _GetJobRunMockPayload( static_dpb_service_instance=None, version='3.0', worker_count=4, - worker_group=mock.Mock(vm_spec=mock.Mock(machine_type='G.2X'))) + worker_group=mock.Mock(vm_spec=mock.Mock(machine_type='G.2X')), +) class AwsDpbEmrTestCase(pkb_common_test_case.PkbCommonTestCase): @@ -87,7 +84,8 @@ def setUp(self): FLAGS.dpb_service_zone = AWS_ZONE_US_EAST_1A FLAGS.zones = [AWS_ZONE_US_EAST_1A] self.issue_cmd_mock = self.enter_context( - mock.patch.object(vm_util, 'IssueCommand', autospec=True)) + mock.patch.object(vm_util, 'IssueCommand', autospec=True) + ) def testGlueCalculateLastJobCost(self): dpb_glue = aws_dpb_glue.AwsDpbGlue(GLUE_SPEC) @@ -97,9 +95,15 @@ def testGlueCalculateLastJobCost(self): self.issue_cmd_mock.side_effect = [ (json.dumps(create_job_response), '', 0), (json.dumps(start_job_run_response), '', 0), - (json.dumps( - _GetJobRunMockPayload(dpu_seconds=None, max_capacity=32.0, - execution_time=2672)), '', 0) + ( + json.dumps( + _GetJobRunMockPayload( + dpu_seconds=None, max_capacity=32.0, execution_time=2672 + ) + ), + '', + 0, + ), ] with mock.patch.object(aws_dpb_glue_prices, 'GLUE_PRICES'): @@ -109,7 +113,8 @@ def testGlueCalculateLastJobCost(self): dpb_glue.SubmitJob( pyspark_file='s3://test/hello.py', job_type=dpb_constants.PYSPARK_JOB_TYPE, - job_arguments=[]) + job_arguments=[], + ) self.assertEqual(dpb_glue.CalculateLastJobCost(), 10.45048888888889)