From 99c55e7f6024a4de8f553f447c1fd4b369e709c1 Mon Sep 17 00:00:00 2001 From: Noah Lavine Date: Mon, 28 Mar 2016 11:23:18 -0700 Subject: [PATCH] Update object storage benchmark in response to review comments Highlights: - Split AllStreamsThroughputStats into ThroughputStats and GapStats - Add Interval class and use it to simplify analysis code - Remove SampleStats and co. and use existing _AppendPercentilesToResults --- perfkitbenchmarker/analyze.py | 209 ++++++++++-------- .../object_storage_service_benchmark.py | 81 ++++--- requirements-testing.txt | 1 + tests/analyze_test.py | 205 ++++++++++------- 4 files changed, 278 insertions(+), 218 deletions(-) diff --git a/perfkitbenchmarker/analyze.py b/perfkitbenchmarker/analyze.py index b20077118d..c823f181bb 100644 --- a/perfkitbenchmarker/analyze.py +++ b/perfkitbenchmarker/analyze.py @@ -19,13 +19,50 @@ data format. """ -import pandas as pd +import perfkitbenchmarker -from perfkitbenchmarker import sample +class Interval(object): + """Represents an interval in time. -def AllStreamsInterval(start_times, durations, stream_ids): - """Compute when all streams were active from multistream records. + Args: + start: float. A POSIX timestamp. + duration: float. A length of time, in seconds. + end: float. A POSIX timestamp. + + Either duration or end must be passed to the constructor. + """ + + def __init__(self, start, duration=None, end=None): + self.start = start + + if duration is not None: + if end is not None and start + duration != end: + raise ValueError( + 'Invalid arguments to interval constructor: %s + %s != %s' % + (start, duration, end)) + self.duration = duration + elif end is not None: + self.duration = end - start + + + @property + def end(self): + return self.start + self.duration + + + def __eq__(self, other): + return (isinstance(other, Interval) and + self.start == other.start and + self.duration == other.duration) + + + def __repr__(self): + return 'Interval(%s, %s, %s)' % (self.start, self.duration, self.end) + + +def AnyAndAllStreamsIntervals(start_times, durations, stream_ids): + """Compute when all streams were active and when any streams were active. Args: start_times: a pd.Series of POSIX timestamps, as floats. @@ -40,51 +77,53 @@ def AllStreamsInterval(start_times, durations, stream_ids): operation. Returns: a tuple of - - a float holding the POSIX timestamp when the last stream to - start began its first operation - - a float holding the length of time in seconds from the first - return value until the first proceess to end stopped its last - operation + - an Interval describing when any streams were active + - an Interval describing when all streams were active """ + assert start_times.index.equals(durations.index) + assert start_times.index.equals(stream_ids.index) + + first_start = start_times.min() + last_end = (start_times + durations).max() + stream_start_times = start_times.groupby(stream_ids).min() stream_end_times = (start_times + durations).groupby(stream_ids).max() interval_start = stream_start_times.max() interval_end = stream_end_times.min() - return interval_start, interval_end - interval_start + return (Interval(first_start, end=last_end), + Interval(interval_start, end=interval_end)) -def StreamStartAndEndGaps(start_times, durations, - interval_start, interval_duration): +def StreamStartAndEndGaps(start_times, durations, interval): """Compute the stream start and stream end timing gaps. Args: start_times: a pd.Series of POSIX timestamps, as floats. durations: a pd.Series of durations, as floats measured in seconds. - interval_start: float. The POSIX timestamp when the last stream started. - interval_duration: float. The time in seconds that all streams were active. + interval: Interval. The interval when all streams were active. Returns: a tuple of - The time between when the first and last streams started. - The time between when the first and last streams ended. """ - interval_end = interval_start + interval_duration + assert start_times.index.equals(durations.index) + first_start = start_times.min() last_end = (start_times + durations).max() - return interval_start - first_start, last_end - interval_end + return interval.start - first_start, last_end - interval.end -def FullyInInterval(start_times, durations, interval_start, interval_duration): +def FullyInInterval(start_times, durations, interval): """Compute which records are completely inside an interval. Args: start_times: a pd.Series of POSIX timestamps, as floats durations: a pd.Series of durations in seconds, as floats - interval_start: the POSIX timestamp of the interval start, as a float - interval_duration: the duration of the interval in seconds, as a float + interval: Interval. The interval to check membership in. start_times and durations must have matching indices. Each (start_time, duration) pair is considered a record of an operation @@ -96,107 +135,95 @@ def FullyInInterval(start_times, durations, interval_start, interval_duration): and false otherwise. """ - interval_end = interval_start + interval_duration - record_ends = start_times + durations + assert start_times.index.equals(durations.index) - return (start_times >= interval_start) & (record_ends <= interval_end) + record_ends = start_times + durations + return (start_times >= interval.start) & (record_ends <= interval.end) -def AllStreamsThroughputStats(durations, sizes, stream_ids, - num_streams, interval_duration): - """Compute the net throughput of multiple streams doing operations. +def ThroughputStats(start_times, durations, sizes, stream_ids, num_streams): + """Compute throughput stats of multiple streams doing operations. Args: + start_times: a pd.Series of POSIX timestamps, as floats. durations: a pd.Series of durations in seconds, as floats. sizes: a pd.Series of bytes. stream_ids: a pd.Series of any type. num_streams: int. The number of streams. - interval_duration: a float. The time all streams were active, in seconds. - durations, sizes, and stream_ids must have matching indices. This - function computes the per-stream net throughput (sum of bytes + start_times, durations, sizes, and stream_ids must have matching indices. + This function computes the per-stream net throughput (sum of bytes transferred / total transfer time) and then adds the results to find the overall net throughput. Operations from the same stream cannot overlap, but operations from different streams may overlap. - Returns: a tuple of - - The net throughput of all streams, excluding times they were inactive. - - The throughput of all streams, including times they were inactive. - - The total time that all streams were inactive. - - The total time all streams were inactive, as a proportion of the - total benchmark time. + Returns: a dictionary whose keys are metric names and whose values + are Quantity objects holding the metric values with appropriate + units. """ - total_bytes_by_stream = sizes.groupby(stream_ids).sum() - active_time_by_stream = durations.groupby(stream_ids).sum() - total_overhead = interval_duration * num_streams - active_time_by_stream.sum() - - return ((total_bytes_by_stream / active_time_by_stream).sum(), - total_bytes_by_stream.sum() / interval_duration, - total_overhead, - total_overhead / (interval_duration * num_streams)) + assert start_times.index.equals(durations.index) + assert start_times.index.equals(sizes.index) + assert start_times.index.equals(stream_ids.index) + bit = perfkitbenchmarker.UNIT_REGISTRY.bit + sec = perfkitbenchmarker.UNIT_REGISTRY.second -def SummaryStats(series, name_prefix=''): + end_times = start_times + durations - """Compute some summary statistics for a series. + stream_starts = start_times.groupby(stream_ids).min() + stream_ends = end_times.groupby(stream_ids).max() - Args: - series: a pd.Series of floats. - name_prefix: if given, a prefix for the statistic names. + total_bytes_by_stream = sizes.groupby(stream_ids).sum() + active_time_by_stream = durations.groupby(stream_ids).sum() + overall_duration_by_stream = stream_ends - stream_starts - Returns: a pd.Series with summary statistics. - """ + return { + 'net throughput': + (total_bytes_by_stream / active_time_by_stream).sum() * 8 * bit / sec, + 'net throughput (experimental)': + (total_bytes_by_stream / + overall_duration_by_stream).sum() * 8 * bit / sec} - # Percentiles 0, 1, 2, ..., 100 - percentiles = range(0, 101, 1) - # range() and xrange() don't accept floating-point arguments, so - # add 99.1, 99.2, ..., 99.9 ourselves. - for i in xrange(1, 10): - percentiles.append(99 + i / 10.0) - # TODO: use series.describe() to simplify this once we upgrade to a - # version of pandas where describe() has the 'percentiles' keyword - # argument. - result = {} - values = sorted(series) - count = len(values) +def GapStats(start_times, durations, stream_ids, interval, num_streams): + """Compute statistics about operation gaps in an interval. - for percentile in percentiles: - name = name_prefix + 'p' + str(percentile) - val = values[int((count - 1) * float(percentile) / 100.0)] - result[name] = val + Args: + start_times: a pd.Series of POSIX timestamps, as floats. + durations: a pd.Series of durations in seconds, as floats. + stream_ids: a pd.Series of any type. + interval: the interval to compute statistics for. + num_streams: the total number of streams. - result[name_prefix + 'min'] = values[0] - result[name_prefix + 'max'] = values[-1] - result[name_prefix + 'median'] = result[name_prefix + 'p50'] - result[name_prefix + 'mean'] = sum(values) / float(count) + Returns: a dictionary whose keys are metric names and whose values + are Quantity objects holding metric values with appropriate units. + """ - sum_of_squares = sum([num ** 2 for num in values]) - result[name_prefix + 'stddev'] = (sum_of_squares / (count - 1)) ** 0.5 + assert start_times.index.equals(durations.index) + assert start_times.index.equals(stream_ids.index) - return pd.Series(result) + sec = perfkitbenchmarker.UNIT_REGISTRY.second + percent = perfkitbenchmarker.UNIT_REGISTRY.percent + end_times = start_times + durations -def AppendStatsAsSamples(series, unit, samples_list, - name_prefix=None, timestamps=None, metadata=None): - """Append statistics about a series to a list as sample.Samples. + # True if record overlaps the interval at all, False if not. + overlaps_interval = ((start_times < interval.end) & + (end_times > interval.start)) - Args: - series: a pd.Series of floats. - unit: string. Passed to sample.Sample. - samples_list: the list of sample.Samples to append to. - name_prefix: if given, a prefix for the statistic names. - timestamps: if given, a pd.Series of floats, used as Sample timestamps. - metadata: if given, extra metadata for the sample.Samples. - - If timestamps is given, its index must match the index of series. - """ + # The records that overlap the interval, shunk to be completely in + # the interval. + start_or_interval = start_times[overlaps_interval].apply( + lambda x: max(x, interval.start)) + end_or_interval = end_times[overlaps_interval].apply( + lambda x: min(x, interval.end)) - stats = SummaryStats(series, name_prefix=name_prefix) + total_active_time = (end_or_interval - start_or_interval).sum() + total_gap_time = interval.duration * num_streams - total_active_time - for name, value in stats.iteritems(): - samples_list.append(sample.Sample( - name, value, unit, - timestamp=timestamps[name] if timestamps is not None else None, - metadata=metadata)) + return {'total gap time': total_gap_time * sec, + 'gap time proportion': + float(total_gap_time) / + (interval.duration * num_streams) * + 100.0 * percent} diff --git a/perfkitbenchmarker/linux_benchmarks/object_storage_service_benchmark.py b/perfkitbenchmarker/linux_benchmarks/object_storage_service_benchmark.py index 5a0d4508b7..e1734ebeb3 100644 --- a/perfkitbenchmarker/linux_benchmarks/object_storage_service_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/object_storage_service_benchmark.py @@ -31,6 +31,7 @@ Documentation: https://goto.google.com/perfkitbenchmarker-storage """ +import itertools import json import logging import os @@ -252,10 +253,10 @@ # benchmark. This is the filename. OBJECTS_WRITTEN_FILE = 'pkb-objects-written' -# If the period of time when some streams, but not all, are active is -# more than MULTISTREAM_STREAM_GAP_THRESHOLD percent of the time when -# all streams are active, then log a warning and put metadata in the -# samples. +# If the gap between different stream starts and ends is above a +# certain proportion of the total time, we log a warning because we +# are throwing out a lot of information. We also put the warning in +# the sample metadata. MULTISTREAM_STREAM_GAP_THRESHOLD = 0.2 @@ -375,12 +376,11 @@ def _ProcessMultiStreamResults(raw_result, operation, sizes, records_json = json.loads(raw_result) records = pd.DataFrame(records_json) - all_streams_start, all_streams_duration = analyze.AllStreamsInterval( + any_streams_active, all_streams_active = analyze.AnyAndAllStreamsIntervals( records['start_time'], records['latency'], records['stream_num']) start_gap, stop_gap = analyze.StreamStartAndEndGaps( - records['start_time'], records['latency'], - all_streams_start, all_streams_duration) - if ((start_gap + stop_gap) / all_streams_duration < + records['start_time'], records['latency'], all_streams_active) + if ((start_gap + stop_gap) / any_streams_active.duration < MULTISTREAM_STREAM_GAP_THRESHOLD): logging.info( 'First stream started %s seconds before last stream started', start_gap) @@ -391,14 +391,13 @@ def _ProcessMultiStreamResults(raw_result, operation, sizes, 'Difference between first and last stream start/end times was %s and ' '%s, which is more than %s of the benchmark time %s.', start_gap, stop_gap, MULTISTREAM_STREAM_GAP_THRESHOLD, - all_streams_duration) + any_streams_active.duration) metadata['stream_gap_above_threshold'] = True records_in_interval = records[ analyze.FullyInInterval(records['start_time'], records['latency'], - all_streams_start, - all_streams_duration)] + all_streams_active)] # Don't publish the full distribution in the metadata because doing # so might break regexp-based parsers that assume that all metadata @@ -409,42 +408,38 @@ def _ProcessMultiStreamResults(raw_result, operation, sizes, distribution_metadata = metadata.copy() distribution_metadata['object_size_B'] = 'distribution' - latency_prefix = 'Multi-stream %s latency ' % operation + latency_prefix = 'Multi-stream %s latency' % operation logging.info('Processing %s multi-stream %s results for the full ' 'distribution.', len(records_in_interval), operation) - analyze.AppendStatsAsSamples( + _AppendPercentilesToResults( + results, records_in_interval['latency'], + latency_prefix, 'sec', - results, - name_prefix=latency_prefix, - metadata=distribution_metadata) + distribution_metadata) logging.info('Processing %s multi-stream %s results for net throughput', len(records), operation) - net_thpt, thpt_with_overhead, total_overhead, overhead_prop = ( - analyze.AllStreamsThroughputStats( - records_in_interval['latency'], - records_in_interval['size'], - records_in_interval['stream_num'], - num_streams, - all_streams_duration)) + throughput_stats = analyze.ThroughputStats( + records_in_interval['start_time'], + records_in_interval['latency'], + records_in_interval['size'], + records_in_interval['stream_num'], + num_streams) + gap_stats = analyze.GapStats( + records['start_time'], + records['latency'], + records['stream_num'], + all_streams_active, + num_streams) logging.info('Benchmark overhead was %s percent of total benchmark time', - overhead_prop * 100.0) - - results.extend([ - sample.Sample( - 'Multi-stream %s net throughput' % operation, - net_thpt, 'B/sec', metadata=distribution_metadata), - sample.Sample( - 'Multi-stream %s net throughput (experimental)' % operation, - thpt_with_overhead, 'B/sec', metadata=distribution_metadata), - sample.Sample( - 'Multi-stream %s total overhead time' % operation, - total_overhead, 'sec', metadata=distribution_metadata), - sample.Sample( - 'Multi-stream %s overhead time proportion' % operation, - overhead_prop * 100.0, 'percent', metadata=distribution_metadata)]) + gap_stats['gap time proportion'].magnitude) + for name, value in itertools.chain(throughput_stats.iteritems(), + gap_stats.iteritems()): + results.append(sample.Sample( + 'Multi-stream ' + operation + ' ' + name, + value.magnitude, str(value.units), metadata=distribution_metadata)) # Publish by-size and full-distribution stats even if there's only # one size in the distribution, because it simplifies postprocessing @@ -455,12 +450,12 @@ def _ProcessMultiStreamResults(raw_result, operation, sizes, this_size_metadata['object_size_B'] = size logging.info('Processing %s multi-stream %s results for object size %s', len(records), operation, size) - analyze.AppendStatsAsSamples( + _AppendPercentilesToResults( + results, this_size_records['latency'], + latency_prefix, 'sec', - results, - name_prefix=latency_prefix, - metadata=this_size_metadata) + this_size_metadata) def _DistributionToBackendFormat(dist): @@ -1471,7 +1466,7 @@ def Run(benchmark_spec): results = OBJECT_STORAGE_BENCHMARK_DICTIONARY[FLAGS.storage].Run(vms[0], metadata) - print results + # print results return results diff --git a/requirements-testing.txt b/requirements-testing.txt index a0f7944c74..c403245474 100644 --- a/requirements-testing.txt +++ b/requirements-testing.txt @@ -32,3 +32,4 @@ flake8>=2.1.0 psutil==3.0.0 gcs-oauth2-boto-plugin azure + diff --git a/tests/analyze_test.py b/tests/analyze_test.py index 71bf70a430..f35a74ed3c 100644 --- a/tests/analyze_test.py +++ b/tests/analyze_test.py @@ -16,13 +16,13 @@ import unittest -import mock import pandas as pd from perfkitbenchmarker import analyze -from perfkitbenchmarker import sample +import perfkitbenchmarker -# All streams were active from time 4.0 to time 8.0. +# The first stream starts at 0.0 and the last one ends at 14.0. All +# streams were active from time 4.0 to time 8.0. SAMPLE_TABLE = pd.DataFrame([ {'start_time': 0.0, 'duration': 2.0, # completely before 'stream_num': 0, 'size': 1}, @@ -38,21 +38,41 @@ 'stream_num': 1, 'size': 4}]) -class TestAllStreamsInterval(unittest.TestCase): - def testAllStreamsInterval(self): - start_time, duration = analyze.AllStreamsInterval( +class TestInterval(unittest.TestCase): + def testStartAndDuration(self): + interval = analyze.Interval(2, duration=5) + self.assertEqual(interval.end, 7) + + def testStartAndEnd(self): + interval = analyze.Interval(2, end=7) + self.assertEqual(interval.duration, 5) + + def testThreeArgsGood(self): + # Test that the constructor doesn't raise an exception + analyze.Interval(2, duration=5, end=7) + + def testThreeArgsBad(self): + with self.assertRaises(ValueError): + analyze.Interval(2, duration=5, end=8) + + +class TestAnyAndAllStreamsIntervals(unittest.TestCase): + def testAnyAndAllStreamsInterval(self): + any_stream, all_streams = analyze.AnyAndAllStreamsIntervals( SAMPLE_TABLE['start_time'], SAMPLE_TABLE['duration'], SAMPLE_TABLE['stream_num']) - self.assertEquals(start_time, 4.0) - self.assertEquals(duration, 4.0) + + print "any_stream", any_stream, "all_streams", all_streams + self.assertEqual(any_stream, analyze.Interval(0, end=14.0)) + self.assertEqual(all_streams, analyze.Interval(4.0, end=8.0)) class TestStreamStartAndEndGaps(unittest.TestCase): def testStreamStartAndEndGaps(self): start_gap, stop_gap = analyze.StreamStartAndEndGaps( SAMPLE_TABLE['start_time'], SAMPLE_TABLE['duration'], - 4.0, 4.0) + analyze.Interval(4.0, duration=4.0)) self.assertEqual(start_gap, 4.0) self.assertEqual(stop_gap, 6.0) @@ -63,116 +83,133 @@ def testFullyInInterval(self): overlaps = analyze.FullyInInterval( SAMPLE_TABLE['start_time'], SAMPLE_TABLE['duration'], - 4.0, 4.0) + analyze.Interval(4.0, duration=4.0)) self.assertTrue( (overlaps == pd.Series([False, False, True, False, False, True])).all()) class TestAllStreamsThroughputStats(unittest.TestCase): + + def setUp(self): + self.byte = perfkitbenchmarker.UNIT_REGISTRY.byte + self.sec = perfkitbenchmarker.UNIT_REGISTRY.second + self.percent = perfkitbenchmarker.UNIT_REGISTRY.percent + + def doTest(self, data, num_streams, correct_answer): + output = analyze.ThroughputStats( + data['start_time'], data['duration'], data['size'], data['stream_num'], + num_streams) + print 'output', output + + for name, value in output.iteritems(): + if name not in correct_answer: + raise KeyError('IntervalThroughputStats produced key %s not in correct ' + 'output %s' % (name, str(correct_answer))) + + self.assertEqual(value, correct_answer[name]) + def testOneObject(self): # Base case: one object. - one_op = pd.DataFrame({'duration': [1], + one_op = pd.DataFrame({'start_time': [0], + 'duration': [1], 'size': [2], 'stream_num': [0]}) - self.assertEqual( - analyze.AllStreamsThroughputStats( - one_op['duration'], one_op['size'], one_op['stream_num'], 1, 1.0), - (2.0, 2.0, 0.0, 0.0)) + self.doTest(one_op, 1, + {'net throughput': 2.0 * self.byte / self.sec, + 'net throughput (experimental)': 2.0 * self.byte / self.sec}) def testSecondObjectSameSpeed(self): # Adding a second object at same speed has no effect on any metric. - no_gap = pd.DataFrame({'duration': [1, 1], + no_gap = pd.DataFrame({'start_time': [0, 1], + 'duration': [1, 1], 'size': [2, 2], 'stream_num': [0, 0]}) - self.assertEqual( - analyze.AllStreamsThroughputStats( - no_gap['duration'], no_gap['size'], no_gap['stream_num'], 1, 2.0), - (2.0, 2.0, 0.0, 0.0)) + self.doTest(no_gap, 1, + {'net throughput': 2.0 * self.byte / self.sec, + 'net throughput (experimental)': 2.0 * self.byte / self.sec}) def testSecondObjectDifferentSpeed(self): # Adding a second object at a different speed yields a different throughput. - different_speeds = pd.DataFrame({'duration': [1, 3], # 4 seconds total + different_speeds = pd.DataFrame({'start_time': [0, 1], + 'duration': [1, 3], # 4 seconds total 'size': [2, 8], # 10 bytes total 'stream_num': [0, 0]}) - self.assertEqual( - analyze.AllStreamsThroughputStats( - different_speeds['duration'], - different_speeds['size'], - different_speeds['stream_num'], - 1, 4.0), - (2.5, 2.5, 0.0, 0.0)) + self.doTest(different_speeds, 1, + {'net throughput': 2.5 * self.byte / self.sec, + 'net throughput (experimental)': 2.5 * self.byte / self.sec}) def testGapBetweenObjects(self): # Adding a gap affects throughput with overheads, but not without. - with_gap = pd.DataFrame({'duration': [1, 1], + with_gap = pd.DataFrame({'start_time': [0, 3], + 'duration': [1, 1], 'size': [2, 2], 'stream_num': [0, 0]}) - self.assertEqual( - analyze.AllStreamsThroughputStats( - with_gap['duration'], with_gap['size'], with_gap['stream_num'], - 1, 4.0), - (2.0, 1.0, 2.0, 0.5)) + self.doTest(with_gap, 1, + {'net throughput': 2.0 * self.byte / self.sec, + 'net throughput (experimental)': 1.0 * self.byte / self.sec}) def testSimultaneousObjects(self): # With two simultaneous objects, throughput adds. - two_streams = pd.DataFrame({'duration': [1, 1], + two_streams = pd.DataFrame({'start_time': [0, 0], + 'duration': [1, 1], 'size': [2, 2], 'stream_num': [0, 1]}) - self.assertEqual( - analyze.AllStreamsThroughputStats( - two_streams['duration'], - two_streams['size'], - two_streams['stream_num'], - 2, 1.0), - (4.0, 4.0, 0.0, 0.0)) + self.doTest(two_streams, 2, + {'net throughput': 4.0 * self.byte / self.sec, + 'net throughput (experimental)': 4.0 * self.byte / self.sec}) def testTwoStreamGaps(self): # With two streams, overhead is compared to 2 * interval length. - two_streams_with_gap = pd.DataFrame({'duration': [1, 1, 1, 1], + two_streams_with_gap = pd.DataFrame({'start_time': [0, 3, 0, 3], + 'duration': [1, 1, 1, 1], 'size': [2, 2, 2, 2], 'stream_num': [0, 0, 1, 1]}) - self.assertEqual( - analyze.AllStreamsThroughputStats( - two_streams_with_gap['duration'], - two_streams_with_gap['size'], - two_streams_with_gap['stream_num'], - 2, 4.0), - (4.0, 2.0, 4.0, 0.5)) - - -class TestSummaryStats(unittest.TestCase): - def testSummaryStats(self): - series = pd.Series(range(0, 1001)) - stats = analyze.SummaryStats(series, name_prefix='foo ') - - self.assertEqual(stats['foo p0'], 0) - self.assertEqual(stats['foo p1'], 10) - self.assertEqual(stats['foo p99.9'], 999) - self.assertEqual(stats['foo p100'], 1000) - self.assertEqual(stats['foo mean'], 500) - - -class TestAppendStatsAsSamples(unittest.TestCase): - def testAppendStatsAsSamples(self): - with mock.patch(analyze.__name__ + '.SummaryStats', - return_value=pd.Series({'a': 1, 'b': 2, 'c': 3})): - samples_list = [] - analyze.AppendStatsAsSamples( - [], 'unit', samples_list, - timestamps=pd.Series({'a': 11, 'b': 12, 'c': 13})) - - self.assertEqual( - samples_list[0], - sample.Sample('a', 1, 'unit', timestamp=11)) - - self.assertEqual( - samples_list[1], - sample.Sample('b', 2, 'unit', timestamp=12)) - - self.assertEqual( - samples_list[2], - sample.Sample('c', 3, 'unit', timestamp=13)) + self.doTest(two_streams_with_gap, 2, + {'net throughput': 4.0 * self.byte / self.sec, + 'net throughput (experimental)': 2.0 * self.byte / self.sec}) + + +class TestGapStats(unittest.TestCase): + + def setUp(self): + self.sec = perfkitbenchmarker.UNIT_REGISTRY.second + self.percent = perfkitbenchmarker.UNIT_REGISTRY.percent + + def doTest(self, data, num_streams, interval, correct_answer): + output = analyze.GapStats( + data['start_time'], data['duration'], data['stream_num'], + interval, num_streams) + + for name, value in output.iteritems(): + if name not in correct_answer: + raise KeyError('GapStats produced key %s not in correct output %s' % + (name, str(correct_answer))) + self.assertEqual(value, correct_answer[name]) + + def testOneRecord(self): + one_record = pd.DataFrame({'start_time': [0], + 'duration': [1], + 'stream_num': [0]}) + self.doTest(one_record, 1, analyze.Interval(0, duration=1), + {'total gap time': 0.0 * self.sec, + 'gap time proportion': 0.0 * self.percent}) + + def testOneStream(self): + one_stream = pd.DataFrame({'start_time': [0, 2, 4], + 'duration': [1, 1, 1], + 'stream_num': [0, 0, 0]}) + self.doTest(one_stream, 1, analyze.Interval(0, duration=5), + {'total gap time': 2.0 * self.sec, + 'gap time proportion': 40.0 * self.percent}) + + def testOverlapInterval(self): + overlap = pd.DataFrame({'start_time': [0, 2, 5, 10, 13], + 'duration': [1, 2, 4, 2, 1], + 'stream_num': [0, 0, 0, 0, 0]}) + self.doTest(overlap, 1, analyze.Interval(3, duration=8), + {'total gap time': 2.0 * self.sec, + 'gap time proportion': 25.0 * self.percent}) if __name__ == '__main__':