Skip to content

Commit

Permalink
Python TextIO Performance Test (#23951)
Browse files Browse the repository at this point in the history
* Python TextIO Performance Test

* Add filebasedio_perf_test module for unified test framework for
  Python file-based IOs

* Fix MetricsReader publishes metrics duplicately if more than one
  load test declared. This is because MetricsReader.publishers was
  static class variable

* Fix pylint

* Distribute Python performance tests random time at a day instead of all at 3PM

* Add information about length conversion
  • Loading branch information
Abacn authored Nov 16, 2022
1 parent 017f2cb commit b952b41
Show file tree
Hide file tree
Showing 9 changed files with 410 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
executeJob(delegate, bqio_read_test)
}

CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Read_Python', 'H 15 * * *', this) {
CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Read_Python', 'H H * * *', this) {
executeJob(delegate, bqio_read_test)
}

Expand All @@ -103,6 +103,6 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
executeJob(delegate, bqio_write_test)
}

CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Write_Python_Batch', 'H 15 * * *', this) {
CronJobBuilder.cronJob('beam_PerformanceTests_BiqQueryIO_Write_Python_Batch', 'H H * * *', this) {
executeJob(delegate, bqio_write_test)
}
81 changes: 81 additions & 0 deletions .test-infra/jenkins/job_PerformanceTests_FileBasedIO_Python.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import CommonJobProperties as common
import LoadTestsBuilder as loadTestsBuilder
import InfluxDBCredentialsHelper

def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))

def jobs = [
[
name : 'beam_PerformanceTests_TextIOIT_Python',
description : 'Runs performance tests for Python TextIOIT',
test : 'apache_beam.io.filebasedio_perf_test',
githubTitle : 'Python TextIO Performance Test',
githubTriggerPhrase: 'Run Python TextIO Performance Test',
pipelineOptions : [
publish_to_big_query : true,
metrics_dataset : 'beam_performance',
metrics_table : 'python_textio_1GB_results',
influx_measurement : 'python_textio_1GB_results',
test_class : 'TextIOPerfTest',
input_options : '\'{' +
'"num_records": 25000000,' +
'"key_size": 9,' +
'"value_size": 21}\'',
dataset_size : '1050000000',
num_workers : '5',
autoscaling_algorithm: 'NONE'
]
]
]

jobs.findAll {
it.name in [
'beam_PerformanceTests_TextIOIT_Python',
]
}.forEach { testJob -> createGCSFileBasedIOITTestJob(testJob) }

private void createGCSFileBasedIOITTestJob(testJob) {
job(testJob.name) {
description(testJob.description)
common.setTopLevelMainJobProperties(delegate)
common.enablePhraseTriggeringFromPullRequest(delegate, testJob.githubTitle, testJob.githubTriggerPhrase)
common.setAutoJob(delegate, 'H H * * *')
InfluxDBCredentialsHelper.useCredentials(delegate)
additionalPipelineArgs = [
influxDatabase: InfluxDBCredentialsHelper.InfluxDBDatabaseName,
influxHost: InfluxDBCredentialsHelper.InfluxDBHostUrl,
]
testJob.pipelineOptions.putAll(additionalPipelineArgs)

def dataflowSpecificOptions = [
runner : 'DataflowRunner',
project : 'apache-beam-testing',
region : 'us-central1',
temp_location : 'gs://temp-storage-for-perf-tests/',
filename_prefix : "gs://temp-storage-for-perf-tests/${testJob.name}/\${BUILD_ID}/",
]

Map allPipelineOptions = dataflowSpecificOptions << testJob.pipelineOptions

loadTestsBuilder.loadTest(
delegate, testJob.name, CommonTestProperties.Runner.DATAFLOW, CommonTestProperties.SDK.PYTHON, allPipelineOptions, testJob.test)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
executeJob(delegate, psio_test)
}

CronJobBuilder.cronJob('beam_PerformanceTests_PubsubIOIT_Python_Streaming', 'H 15 * * *', this) {
CronJobBuilder.cronJob('beam_PerformanceTests_PubsubIOIT_Python_Streaming', 'H H * * *', this) {
executeJob(delegate, psio_test)
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
executeJob(delegate, spannerio_read_test_2gb)
}

CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Read_2GB_Python', 'H 15 * * *', this) {
CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Read_2GB_Python', 'H H * * *', this) {
executeJob(delegate, spannerio_read_test_2gb)
}

Expand All @@ -105,6 +105,6 @@ PhraseTriggeringPostCommitBuilder.postCommitJob(
executeJob(delegate, spannerio_write_test_2gb)
}

CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch', 'H 15 * * *', this) {
CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch', 'H H * * *', this) {
executeJob(delegate, spannerio_write_test_2gb)
}
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,128 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"cacheTimeout": null,
"dashLength": 10,
"dashes": false,
"datasource": "BeamInfluxDB",
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 9
},
"hiddenSeries": false,
"id": 6,
"interval": "24h",
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 2,
"links": [],
"nullPointMode": "connected",
"options": {
"dataLinks": []
},
"percentage": false,
"pluginVersion": "6.7.2",
"pointradius": 2,
"points": true,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"alias": "$tag_metric",
"groupBy": [
{
"params": [
"$__interval"
],
"type": "time"
}
],
"measurement": "",
"orderByTime": "ASC",
"policy": "default",
"query": "SELECT mean(\"value\") FROM \"python_textio_1GB_results\" WHERE \"metric\" = 'read_runtime' OR \"metric\" = 'write_runtime' AND $timeFilter GROUP BY time($__interval), \"metric\"",
"rawQuery": true,
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"value"
],
"type": "field"
},
{
"params": [],
"type": "mean"
}
]
],
"tags": []
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "TextIO | GCS | 1 GB",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"transparent": true,
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:403",
"format": "s",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:404",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"schemaVersion": 22,
Expand Down
Loading

0 comments on commit b952b41

Please sign in to comment.