Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve tpc-h q6 performance (single-topic) #15035

Open
Tracked by #15036
lmatz opened this issue Feb 6, 2024 · 6 comments
Open
Tracked by #15036

perf: improve tpc-h q6 performance (single-topic) #15035

lmatz opened this issue Feb 6, 2024 · 6 comments

Comments

@lmatz
Copy link
Contributor

lmatz commented Feb 6, 2024

See performance numbers at https://www.notion.so/risingwave-labs/TPCH-Performance-Numbers-Table-e098ef82884546949333409f0513ada7?pvs=4#8de0bf4bda51444c8381f3b0c10ddfe1

@lmatz lmatz added the type/perf label Feb 6, 2024
@lmatz lmatz assigned lmatz and xxchan Feb 6, 2024
@github-actions github-actions bot added this to the release-1.7 milestone Feb 6, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Feb 6, 2024

RW Query:

create sink tpch_q6 as
    select
        sum(l_extendedprice * l_discount) as revenue
    from
        lineitem
    where
        l_shipdate >= date '1994-01-01'
        and l_shipdate < date '2099-01-01' + interval '1' year
        and l_discount between 0.08 - 0.01 and 0.08 + 1.01
        and l_quantity < 24000
    with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

Query Plan:

 StreamSink { type: append-only, columns: [revenue] }
 └─StreamProject { exprs: [sum(sum($expr1))] }
   └─StreamSimpleAgg [append_only] { aggs: [sum(sum($expr1)), count] }
     └─StreamExchange { dist: Single }
       └─StreamStatelessSimpleAgg { aggs: [sum($expr1)] }
         └─StreamProject { exprs: [(Field(lineitem, 5:Int32) * Field(lineitem, 6:Int32)) as $expr1, _row_id] }
           └─StreamFilter { predicate: (Field(lineitem, 10:Int32) >= '1994-01-01':Date) AND (Field(lineitem, 10:Int32) < '2100-01-01 00:00:00':Timestamp) AND (Field(lineitem, 6:Int32) >= 0.07:Decimal) AND (Field(lineitem, 6:Int32) <= 1.09:Decimal) AND (Field(lineitem, 4:Int32) < 24000:Decimal) AND (eventType = 'lineitem':Varchar) }
             └─StreamRowIdGen { row_id_index: 10 }
               └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(9 rows)

Flink Query:

INSERT INTO tpch_q6
    select
      sum(l_extendedprice * l_discount) as revenue
    from
      lineitem
    where
      l_shipdate >= date '1994-01-01'
      and l_shipdate < date '2099-01-01' + interval '1' year
      and l_discount between 0.08 - 0.08 and 0.08 + 1.01
      and l_quantity < 24000;

Query Plan:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.tpch_q6], fields=[revenue])
+- Calc(select=[CAST(revenue AS DECIMAL(10, 0)) AS revenue])
   +- GroupAggregate(select=[SUM($f0) AS revenue])
      +- Exchange(distribution=[single])
         +- Calc(select=[*(lineitem.l_extendedprice, lineitem.l_discount) AS $f0], where=[AND(SEARCH(eventType, Sarg[_UTF-16LE'lineitem':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), SEARCH(lineitem.l_shipdate, Sarg[[1994-01-01..2100-01-01)]), SEARCH(lineitem.l_discount, Sarg[[0.00:DECIMAL(4, 2)..1.09:DECIMAL(4, 2)]]:DECIMAL(4, 2)), SEARCH(lineitem.l_quantity, Sarg[(-∞..24000)]))])
            +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.tpch_q6], fields=[revenue])
+- Calc(select=[CAST(revenue AS DECIMAL(10, 0)) AS revenue])
   +- GroupAggregate(select=[SUM($f0) AS revenue])
      +- Exchange(distribution=[single])
         +- Calc(select=[(lineitem.l_extendedprice * lineitem.l_discount) AS $f0], where=[(SEARCH(eventType, Sarg[_UTF-16LE'lineitem']) AND SEARCH(lineitem.l_shipdate, Sarg[[1994-01-01..2100-01-01)]) AND SEARCH(lineitem.l_discount, Sarg[[0.00..1.09]]) AND SEARCH(lineitem.l_quantity, Sarg[(-∞..24000)]))])
            +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])

@lmatz
Copy link
Contributor Author

lmatz commented Feb 6, 2024

What's the purpose of the count in StreamSimpleAgg [append_only] { aggs: [sum(sum($expr1)), count] }?
It seems unnecessary to me as the following project does not use it

For not outputting a 0 instead of null when there is no input at all?

@lmatz lmatz changed the title perf: improve tpc-h q6 performance perf: improve tpc-h q6 performance (single-topic) Feb 6, 2024
@st1page
Copy link
Contributor

st1page commented Feb 8, 2024

What's the purpose of the count in StreamSimpleAgg [append_only] { aggs: [sum(sum($expr1)), count] }? It seems unnecessary to me as the following project does not use it

For not outputting a 0 instead of null when there is no input at all?

it is used by the agg executor self to know if there is at least one row in the input.

@lmatz
Copy link
Contributor Author

lmatz commented Feb 21, 2024

The performance for q6 is similar to the gap of q1, #15034

please see the comment here: #15034 (comment)

link #14815

@lmatz lmatz removed this from the release-1.7 milestone Mar 6, 2024
Copy link
Contributor

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants