Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve tpc-h q1 performance (single-topic) #15034

Open
Tracked by #15036
lmatz opened this issue Feb 6, 2024 · 7 comments
Open
Tracked by #15036

perf: improve tpc-h q1 performance (single-topic) #15034

lmatz opened this issue Feb 6, 2024 · 7 comments

Comments

@lmatz
Copy link
Contributor

lmatz commented Feb 6, 2024

See performance numbers at https://www.notion.so/risingwave-labs/TPCH-Performance-Numbers-Table-e098ef82884546949333409f0513ada7?pvs=4#8de0bf4bda51444c8381f3b0c10ddfe1

@lmatz lmatz added the type/perf label Feb 6, 2024
@lmatz lmatz assigned lmatz and xxchan Feb 6, 2024
@github-actions github-actions bot added this to the release-1.7 milestone Feb 6, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Feb 6, 2024

RW Query:

    create sink tpch_q1 as
    select
        l_returnflag,
        l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        round(avg(l_quantity), 4) as avg_qty,
        round(avg(l_extendedprice), 4) as avg_price,
        round(avg(l_discount), 4) as avg_disc,
        count(*) as count_order
    from 
        lineitem 
    where 
        l_shipdate <= date '2099-12-01' - interval '71' day
    group by 
        l_returnflag, 
        l_linestatus
    with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

Query Plan:

 StreamSink { type: append-only, columns: [l_returnflag, l_linestatus, sum_qty, sum_base_price, sum_disc_price, sum_charge, avg_qty, avg_price, avg_disc, count_order] }
 └─StreamProject { exprs: [$expr5, $expr6, sum($expr1), sum($expr2), sum($expr7), sum($expr8), RoundDigit((sum($expr1) / count($expr1)::Decimal), 4:Int32) as $expr9, RoundDigit((sum($expr2) / count($expr2)::Decimal), 4:Int32) as $expr10, RoundDigit((sum($expr3) / count($expr3)::Decimal), 4:Int32) as $expr11, count] }
   └─StreamHashAgg [append_only] { group_key: [$expr5, $expr6], aggs: [sum($expr1), sum($expr2), sum($expr7), sum($expr8), count($expr1), count($expr2), sum($expr3), count($expr3), count] }
     └─StreamExchange { dist: HashShard($expr5, $expr6) }
       └─StreamProject { exprs: [$expr5, $expr6, $expr1, $expr2, $expr7, ($expr7 * (1:Decimal + $expr4)) as $expr8, $expr3, _row_id] }
         └─StreamProject { exprs: [$expr1, $expr2, $expr3, $expr4, $expr5, $expr6, ($expr2 * (1:Decimal - $expr3)) as $expr7, _row_id] }
           └─StreamProject { exprs: [Field(lineitem, 4:Int32) as $expr1, Field(lineitem, 5:Int32) as $expr2, Field(lineitem, 6:Int32) as $expr3, Field(lineitem, 7:Int32) as $expr4, Field(lineitem, 8:Int32) as $expr5, Field(lineitem, 9:Int32) as $expr6, _row_id] }
             └─StreamFilter { predicate: (Field(lineitem, 10:Int32) <= '2099-09-21 00:00:00':Timestamp) AND (eventType = 'lineitem':Varchar) }
               └─StreamRowIdGen { row_id_index: 10 }
                 └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(10 rows)

Flink:

INSERT INTO tpch_q1
    select
        l_returnflag,
        l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        round(avg(l_quantity), 4) as avg_qty,
        round(avg(l_extendedprice), 4) as avg_price,
        round(avg(l_discount), 4) as avg_disc,
        count(*) as count_order
    from
        lineitem
    where 
        l_shipdate <= date '2099-12-01' - interval '71' day
    group by
        l_returnflag,
        l_linestatus;

Query Plan:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.tpch_q1], fields=[l_returnflag, l_linestatus, sum_qty, sum_base_price, sum_disc_price, sum_charge, avg_qty, avg_price, avg_disc, count_order])
+- Calc(select=[l_returnflag, l_linestatus, CAST(sum_qty AS DECIMAL(10, 0)) AS sum_qty, CAST(sum_base_price AS DECIMAL(10, 0)) AS sum_base_price, CAST(sum_disc_price AS DECIMAL(10, 0)) AS sum_disc_price, CAST(sum_charge AS DECIMAL(10, 0)) AS sum_charge, CAST(ROUND($f6, 4) AS DECIMAL(10, 0)) AS avg_qty, CAST(ROUND($f7, 4) AS DECIMAL(10, 0)) AS avg_price, CAST(ROUND($f8, 4) AS DECIMAL(10, 0)) AS avg_disc, CAST(count_order AS BIGINT) AS count_order])
   +- GroupAggregate(groupBy=[l_returnflag, l_linestatus], select=[l_returnflag, l_linestatus, SUM(l_quantity) AS sum_qty, SUM(l_extendedprice) AS sum_base_price, SUM($f4) AS sum_disc_price, SUM($f5) AS sum_charge, AVG(l_quantity) AS $f6, AVG(l_extendedprice) AS $f7, AVG(l_discount) AS $f8, COUNT(*) AS count_order])
      +- Exchange(distribution=[hash[l_returnflag, l_linestatus]])
         +- Calc(select=[lineitem.l_returnflag AS l_returnflag, lineitem.l_linestatus AS l_linestatus, lineitem.l_quantity AS l_quantity, lineitem.l_extendedprice AS l_extendedprice, *(lineitem.l_extendedprice, -(1, lineitem.l_discount)) AS $f4, *(*(lineitem.l_extendedprice, -(1, lineitem.l_discount)), +(1, lineitem.l_tax)) AS $f5, lineitem.l_discount AS l_discount], where=[AND(=(eventType, _UTF-16LE'lineitem':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), <=(lineitem.l_shipdate, 2099-09-21))])
            +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.tpch_q1], fields=[l_returnflag, l_linestatus, sum_qty, sum_base_price, sum_disc_price, sum_charge, avg_qty, avg_price, avg_disc, count_order])
+- Calc(select=[l_returnflag, l_linestatus, CAST(sum_qty AS DECIMAL(10, 0)) AS sum_qty, CAST(sum_base_price AS DECIMAL(10, 0)) AS sum_base_price, CAST(sum_disc_price AS DECIMAL(10, 0)) AS sum_disc_price, CAST(sum_charge AS DECIMAL(10, 0)) AS sum_charge, CAST(ROUND($f6, 4) AS DECIMAL(10, 0)) AS avg_qty, CAST(ROUND($f7, 4) AS DECIMAL(10, 0)) AS avg_price, CAST(ROUND($f8, 4) AS DECIMAL(10, 0)) AS avg_disc, CAST(count_order AS BIGINT) AS count_order])
   +- GroupAggregate(groupBy=[l_returnflag, l_linestatus], select=[l_returnflag, l_linestatus, SUM(l_quantity) AS sum_qty, SUM(l_extendedprice) AS sum_base_price, SUM($f4) AS sum_disc_price, SUM($f5) AS sum_charge, AVG(l_quantity) AS $f6, AVG(l_extendedprice) AS $f7, AVG(l_discount) AS $f8, COUNT(*) AS count_order])
      +- Exchange(distribution=[hash[l_returnflag, l_linestatus]])
         +- Calc(select=[lineitem.l_returnflag AS l_returnflag, lineitem.l_linestatus AS l_linestatus, lineitem.l_quantity AS l_quantity, lineitem.l_extendedprice AS l_extendedprice, (lineitem.l_extendedprice * (1 - lineitem.l_discount)) AS $f4, ((lineitem.l_extendedprice * (1 - lineitem.l_discount)) * (1 + lineitem.l_tax)) AS $f5, lineitem.l_discount AS l_discount], where=[((eventType = 'lineitem') AND (lineitem.l_shipdate <= 2099-09-21))])
            +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])

@lmatz
Copy link
Contributor Author

lmatz commented Feb 6, 2024

Why three separate and consecutive StreamProject, is this expected?

Edit:
@st1page :

yes, to reuse the same sub expression’s result and save duplicated computation

@lmatz lmatz changed the title perf: improve tpc-h q1 performance perf: improve tpc-h q1 performance (single-topic) Feb 6, 2024
@lmatz
Copy link
Contributor Author

lmatz commented Feb 7, 2024

@st1page
Copy link
Contributor

st1page commented Feb 8, 2024

However, there is clear skewness:

could it be the same issue with #5214?

@lmatz
Copy link
Contributor Author

lmatz commented Feb 8, 2024

However, there is clear skewness:

could it be the same issue with #5214?

Could be

Three possibilities in total:

  1. Imbalanced kafka source actors' throughput when running nexmark benchmark #5214
  2. The merge executor throughput indicates skewness in data (after group by?)
  3. Different partitions may contain drastically different amounts of events (as we have a single topic here which includes the events from all 8 tables in TPC-H)

@lmatz
Copy link
Contributor Author

lmatz commented Feb 21, 2024

So I added a simple stateless query called q0 that does nothing and evaluated it for both RW and Flink: https://github.com/risingwavelabs/kube-bench/blob/main/manifests/tpch/tpch-modified-sinks.template.yaml#L7-L33

it turns out the performance is almost exactly the same as q1 for both RW and Flink (as shown in the notion page in the first thread):

RW: http://metabase.risingwave-cloud.xyz/question/10801-tpch-q0-bs-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-2971?start_date=2024-01-09
Flink: http://metabase.risingwave-cloud.xyz/question/10770-flink-tpch-q0-flink-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-2968?start_date=2024-01-08

So we can conclude that the computation is very light and that the bottleneck should be on the source.

As there is an outstanding issue for fixing the source's performance regression, we will wait for #14815 to be fixed first and see what should be the next step

@lmatz lmatz removed this from the release-1.7 milestone Mar 6, 2024
Copy link
Contributor

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants