Skip to content

Commit

Permalink
Merge branch 'main' into rewrite_avg
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 authored Oct 12, 2023
2 parents 738397c + 1060d85 commit 410324e
Show file tree
Hide file tree
Showing 21 changed files with 196 additions and 51 deletions.
14 changes: 14 additions & 0 deletions src/query/ast/src/ast/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub enum IntervalKind {
Minute,
Second,
Doy,
Week,
Dow,
}

Expand Down Expand Up @@ -143,6 +144,12 @@ pub enum Expr {
kind: IntervalKind,
expr: Box<Expr>,
},
/// DATE_PART(IntervalKind, <expr>)
DatePart {
span: Span,
kind: IntervalKind,
expr: Box<Expr>,
},
/// POSITION(<expr> IN <expr>)
Position {
span: Span,
Expand Down Expand Up @@ -503,6 +510,7 @@ impl Expr {
| Expr::Cast { span, .. }
| Expr::TryCast { span, .. }
| Expr::Extract { span, .. }
| Expr::DatePart { span, .. }
| Expr::Position { span, .. }
| Expr::Substring { span, .. }
| Expr::Trim { span, .. }
Expand Down Expand Up @@ -536,6 +544,7 @@ impl Display for IntervalKind {
IntervalKind::Second => "SECOND",
IntervalKind::Doy => "DOY",
IntervalKind::Dow => "DOW",
IntervalKind::Week => "WEEK",
})
}
}
Expand Down Expand Up @@ -1027,6 +1036,11 @@ impl Display for Expr {
} => {
write!(f, "EXTRACT({field} FROM {expr})")?;
}
Expr::DatePart {
kind: field, expr, ..
} => {
write!(f, "DATE_PART({field}, {expr})")?;
}
Expr::Position {
substr_expr,
str_expr,
Expand Down
9 changes: 9 additions & 0 deletions src/query/ast/src/ast/format/syntax/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ pub(crate) fn pretty_expr(expr: Expr) -> RcDoc<'static> {
.append(RcDoc::space())
.append(pretty_expr(*expr))
.append(RcDoc::text(")")),
Expr::DatePart {
kind: field, expr, ..
} => RcDoc::text("DATE_PART(")
.append(RcDoc::text(field.to_string()))
.append(RcDoc::space())
.append(RcDoc::text(","))
.append(RcDoc::space())
.append(pretty_expr(*expr))
.append(RcDoc::text(")")),
Expr::Position {
substr_expr,
str_expr,
Expand Down
27 changes: 26 additions & 1 deletion src/query/ast/src/parser/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ pub enum ExprElement {
field: IntervalKind,
expr: Box<Expr>,
},
/// DATE_PART(IntervalKind, <expr>)
DatePart {
field: IntervalKind,
expr: Box<Expr>,
},
/// POSITION(<expr> IN <expr>)
Position {
substr_expr: Box<Expr>,
Expand Down Expand Up @@ -452,6 +457,11 @@ impl<'a, I: Iterator<Item = WithSpan<'a, ExprElement>>> PrattParser<I> for ExprP
kind: field,
expr,
},
ExprElement::DatePart { field, expr } => Expr::DatePart {
span: transform_span(elem.span.0),
kind: field,
expr,
},
ExprElement::Position {
substr_expr,
str_expr,
Expand Down Expand Up @@ -730,6 +740,15 @@ pub fn expr_element(i: Input) -> IResult<WithSpan<ExprElement>> {
},
|(_, target_type)| ExprElement::PgCast { target_type },
);
let date_part = map(
rule! {
DATE_PART ~ "(" ~ ^#interval_kind ~ "," ~ ^#subexpr(0) ~ ^")"
},
|(_, _, field, _, expr, _)| ExprElement::DatePart {
field,
expr: Box::new(expr),
},
);
let extract = map(
rule! {
EXTRACT ~ "(" ~ ^#interval_kind ~ ^FROM ~ ^#subexpr(0) ~ ^")"
Expand Down Expand Up @@ -1044,7 +1063,8 @@ pub fn expr_element(i: Input) -> IResult<WithSpan<ExprElement>> {
| #timestamp_expr: "`TIMESTAMP <str_literal>`"
| #interval: "`INTERVAL ... (YEAR | QUARTER | MONTH | DAY | HOUR | MINUTE | SECOND | DOY | DOW)`"
| #pg_cast : "`::<type_name>`"
| #extract : "`EXTRACT((YEAR | QUARTER | MONTH | DAY | HOUR | MINUTE | SECOND) FROM ...)`"
| #extract : "`EXTRACT((YEAR | QUARTER | MONTH | DAY | HOUR | MINUTE | SECOND | WEEK) FROM ...)`"
| #date_part : "`DATE_PART((YEAR | QUARTER | MONTH | DAY | HOUR | MINUTE | SECOND | WEEK), ...)`"
),
rule!(
#position : "`POSITION(... IN ...)`"
Expand Down Expand Up @@ -1398,6 +1418,7 @@ pub fn interval_kind(i: Input) -> IResult<IntervalKind> {
value(IntervalKind::Second, rule! { SECOND }),
value(IntervalKind::Doy, rule! { DOY }),
value(IntervalKind::Dow, rule! { DOW }),
value(IntervalKind::Week, rule! { WEEK }),
value(
IntervalKind::Year,
rule! { #literal_string_eq_ignore_case("YEAR") },
Expand Down Expand Up @@ -1434,6 +1455,10 @@ pub fn interval_kind(i: Input) -> IResult<IntervalKind> {
IntervalKind::Dow,
rule! { #literal_string_eq_ignore_case("DOW") },
),
value(
IntervalKind::Week,
rule! { #literal_string_eq_ignore_case("WEEK") },
),
))(i)
}

Expand Down
7 changes: 5 additions & 2 deletions src/query/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ pub enum TokenKind {
DATE,
#[token("DATE_ADD", ignore(ascii_case))]
DATE_ADD,
#[token("DATE_PART", ignore(ascii_case))]
DATE_PART,
#[token("DATE_SUB", ignore(ascii_case))]
DATE_SUB,
#[token("DATE_TRUNC", ignore(ascii_case))]
Expand Down Expand Up @@ -449,6 +451,8 @@ pub enum TokenKind {
DOUBLE,
#[token("DOW", ignore(ascii_case))]
DOW,
#[token("WEEK", ignore(ascii_case))]
WEEK,
#[token("DOY", ignore(ascii_case))]
DOY,
#[token("DOWNLOAD", ignore(ascii_case))]
Expand Down Expand Up @@ -960,8 +964,6 @@ pub enum TokenKind {
VIEW,
#[token("VIRTUAL", ignore(ascii_case))]
VIRTUAL,
#[token("WEEK", ignore(ascii_case))]
WEEK,
#[token("WHEN", ignore(ascii_case))]
WHEN,
#[token("WHERE", ignore(ascii_case))]
Expand Down Expand Up @@ -1125,6 +1127,7 @@ impl TokenKind {
| TokenKind::END
| TokenKind::EXISTS
| TokenKind::EXTRACT
| TokenKind::DATE_PART
| TokenKind::FALSE
| TokenKind::FLOAT
// | TokenKind::FOREIGN
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/src/visitors/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub fn walk_expr<'a, V: Visitor<'a>>(visitor: &mut V, expr: &'a Expr) {
target_type,
} => visitor.visit_try_cast(*span, expr, target_type),
Expr::Extract { span, kind, expr } => visitor.visit_extract(*span, kind, expr),
Expr::DatePart { span, kind, expr } => visitor.visit_extract(*span, kind, expr),
Expr::Position {
span,
substr_expr,
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/src/visitors/walk_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub fn walk_expr_mut<V: VisitorMut>(visitor: &mut V, expr: &mut Expr) {
target_type,
} => visitor.visit_try_cast(*span, expr, target_type),
Expr::Extract { span, kind, expr } => visitor.visit_extract(*span, kind, expr),
Expr::DatePart { span, kind, expr } => visitor.visit_extract(*span, kind, expr),
Expr::Position {
span,
substr_expr,
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ fn test_expr() {
r#"TRY_CAST(col1 AS TUPLE(BIGINT UNSIGNED NULL, BOOLEAN))"#,
r#"trim(leading 'abc' from 'def')"#,
r#"extract(year from d)"#,
r#"date_part(year, d)"#,
r#"position('a' in str)"#,
r#"substring(a from b for c)"#,
r#"substring(a, b, c)"#,
Expand Down
2 changes: 1 addition & 1 deletion src/query/ast/tests/it/testdata/expr-error.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ error:
--> SQL:1:10
|
1 | CAST(col1)
| ---- ^ expected `AS`, `,`, `(`, `.`, `IS`, `NOT`, `IN`, `EXISTS`, `BETWEEN`, `+`, `-`, `*`, `/`, `//`, `DIV`, `%`, `||`, `<->`, `>`, `<`, `>=`, `<=`, `=`, `<>`, `!=`, `^`, `AND`, `OR`, `XOR`, `LIKE`, `REGEXP`, `RLIKE`, `SOUNDS`, <BitWiseOr>, <BitWiseAnd>, <BitWiseXor>, <ShiftLeft>, <ShiftRight>, <Factorial>, <SquareRoot>, <BitWiseNot>, <CubeRoot>, <Abs>, `CAST`, `TRY_CAST`, `DATE_ADD`, `DATE_SUB`, `DATE_TRUNC`, `DATE`, `TIMESTAMP`, `INTERVAL`, `::`, `EXTRACT`, `POSITION`, `SUBSTRING`, `SUBSTR`, `TRIM`, `COUNT`, <Ident>, <QuotedString>, or 16 more ...
| ---- ^ expected `AS`, `,`, `(`, `.`, `IS`, `NOT`, `IN`, `EXISTS`, `BETWEEN`, `+`, `-`, `*`, `/`, `//`, `DIV`, `%`, `||`, `<->`, `>`, `<`, `>=`, `<=`, `=`, `<>`, `!=`, `^`, `AND`, `OR`, `XOR`, `LIKE`, `REGEXP`, `RLIKE`, `SOUNDS`, <BitWiseOr>, <BitWiseAnd>, <BitWiseXor>, <ShiftLeft>, <ShiftRight>, <Factorial>, <SquareRoot>, <BitWiseNot>, <CubeRoot>, <Abs>, `CAST`, `TRY_CAST`, `DATE_ADD`, `DATE_SUB`, `DATE_TRUNC`, `DATE`, `TIMESTAMP`, `INTERVAL`, `::`, `EXTRACT`, `DATE_PART`, `POSITION`, `SUBSTRING`, `SUBSTR`, `TRIM`, `COUNT`, <Ident>, or 17 more ...
| |
| while parsing `CAST(... AS ...)`
| while parsing expression
Expand Down
29 changes: 29 additions & 0 deletions src/query/ast/tests/it/testdata/expr.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,35 @@ Extract {
}


---------- Input ----------
date_part(year, d)
---------- Output ---------
DATE_PART(YEAR, d)
---------- AST ------------
DatePart {
span: Some(
0..18,
),
kind: Year,
expr: ColumnRef {
span: Some(
16..17,
),
database: None,
table: None,
column: Name(
Identifier {
name: "d",
quote: None,
span: Some(
16..17,
),
},
),
},
}


---------- Input ----------
position('a' in str)
---------- Output ---------
Expand Down
2 changes: 1 addition & 1 deletion src/query/ast/tests/it/testdata/statement-error.txt
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ error:
--> SQL:1:41
|
1 | SELECT * FROM t GROUP BY GROUPING SETS ()
| ------ ^ expected `(`, `IS`, `IN`, `EXISTS`, `BETWEEN`, `+`, `-`, `*`, `/`, `//`, `DIV`, `%`, `||`, `<->`, `>`, `<`, `>=`, `<=`, `=`, `<>`, `!=`, `^`, `AND`, `OR`, `XOR`, `LIKE`, `NOT`, `REGEXP`, `RLIKE`, `SOUNDS`, <BitWiseOr>, <BitWiseAnd>, <BitWiseXor>, <ShiftLeft>, <ShiftRight>, <Factorial>, <SquareRoot>, <BitWiseNot>, <CubeRoot>, <Abs>, `CAST`, `TRY_CAST`, `DATE_ADD`, `DATE_SUB`, `DATE_TRUNC`, `DATE`, `TIMESTAMP`, `INTERVAL`, `::`, `EXTRACT`, `POSITION`, `SUBSTRING`, `SUBSTR`, `TRIM`, `COUNT`, <Ident>, <QuotedString>, `CASE`, `ColumnPosition`, `[`, or 14 more ...
| ------ ^ expected `(`, `IS`, `IN`, `EXISTS`, `BETWEEN`, `+`, `-`, `*`, `/`, `//`, `DIV`, `%`, `||`, `<->`, `>`, `<`, `>=`, `<=`, `=`, `<>`, `!=`, `^`, `AND`, `OR`, `XOR`, `LIKE`, `NOT`, `REGEXP`, `RLIKE`, `SOUNDS`, <BitWiseOr>, <BitWiseAnd>, <BitWiseXor>, <ShiftLeft>, <ShiftRight>, <Factorial>, <SquareRoot>, <BitWiseNot>, <CubeRoot>, <Abs>, `CAST`, `TRY_CAST`, `DATE_ADD`, `DATE_SUB`, `DATE_TRUNC`, `DATE`, `TIMESTAMP`, `INTERVAL`, `::`, `EXTRACT`, `DATE_PART`, `POSITION`, `SUBSTRING`, `SUBSTR`, `TRIM`, `COUNT`, <Ident>, <QuotedString>, `CASE`, `ColumnPosition`, or 15 more ...
| |
| while parsing `SELECT ...`

Expand Down
12 changes: 3 additions & 9 deletions src/query/service/src/interpreters/common/query_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

use std::fmt::Write;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

use common_config::GlobalConfig;
use common_exception::ErrorCode;
Expand All @@ -28,6 +26,7 @@ use log::error;
use log::info;
use serde_json;

use crate::sessions::convert_query_log_timestamp;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;

Expand Down Expand Up @@ -187,6 +186,7 @@ impl InterpreterQueryLog {
}

pub fn log_finish(ctx: &QueryContext, now: SystemTime, err: Option<ErrorCode>) -> Result<()> {
ctx.set_finish_time(now);
// User.
let handler_type = ctx.get_current_session().get_type().to_string();
let tenant_id = GlobalConfig::instance().query.tenant_id.clone();
Expand All @@ -206,7 +206,7 @@ impl InterpreterQueryLog {
let event_time = convert_query_log_timestamp(now);
let event_date = (event_time / (24 * 3_600_000_000)) as i32;
let query_start_time = convert_query_log_timestamp(ctx.get_created_time());
let query_duration_ms = (event_time - query_start_time) / 1_000;
let query_duration_ms = ctx.get_query_duration_ms();
let data_metrics = ctx.get_data_metrics();

let written_rows = ctx.get_write_progress_value().rows as u64;
Expand Down Expand Up @@ -316,9 +316,3 @@ impl InterpreterQueryLog {
})
}
}

fn convert_query_log_timestamp(time: SystemTime) -> i64 {
time.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::new(0, 0))
.as_micros() as i64
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl QueryError {
pub struct QueryStats {
#[serde(flatten)]
pub progresses: Progresses,
pub running_time_ms: f64,
pub running_time_ms: i64,
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down
17 changes: 8 additions & 9 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;

use common_base::base::tokio::sync::RwLock;
Expand Down Expand Up @@ -111,12 +109,11 @@ pub struct ExecuteStopped {
pub stats: Progresses,
pub affect: Option<QueryAffect>,
pub reason: Result<()>,
pub stop_time: Instant,
pub query_duration_ms: i64,
}

pub struct Executor {
pub query_id: String,
pub start_time: Instant,
pub state: ExecuteState,
}

Expand All @@ -137,10 +134,12 @@ impl Executor {
}
}

pub fn elapsed(&self) -> Duration {
pub fn get_query_duration_ms(&self) -> i64 {
match &self.state {
Starting(_) | Running(_) => Instant::now() - self.start_time,
Stopped(f) => f.stop_time - self.start_time,
Starting(ExecuteStarting { ctx }) | Running(ExecuteRunning { ctx, .. }) => {
ctx.get_query_duration_ms()
}
Stopped(f) => f.query_duration_ms,
}
}

Expand Down Expand Up @@ -179,7 +178,7 @@ impl Executor {
guard.state = Stopped(Box::new(ExecuteStopped {
stats: Default::default(),
reason,
stop_time: Instant::now(),
query_duration_ms: s.ctx.get_query_duration_ms(),
affect: Default::default(),
}))
}
Expand All @@ -198,7 +197,7 @@ impl Executor {
guard.state = Stopped(Box::new(ExecuteStopped {
stats: Progresses::from_context(&r.ctx),
reason,
stop_time: Instant::now(),
query_duration_ms: r.ctx.get_query_duration_ms(),
affect: r.ctx.get_affect(),
}))
}
Expand Down
8 changes: 3 additions & 5 deletions src/query/service/src/servers/http/v1/query/http_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ pub struct StageAttachmentConf {

#[derive(Debug, Clone)]
pub struct ResponseState {
pub running_time_ms: f64,
pub running_time_ms: i64,
pub progresses: Progresses,
pub state: ExecuteStateKind,
pub affect: Option<QueryAffect>,
Expand Down Expand Up @@ -315,10 +315,8 @@ impl HttpQuery {
};

let (block_sender, block_receiver) = sized_spsc(request.pagination.max_rows_in_buffer);
let start_time = Instant::now();
let state = Arc::new(RwLock::new(Executor {
query_id: query_id.clone(),
start_time,
state: ExecuteState::Starting(ExecuteStarting { ctx: ctx.clone() }),
}));
let block_sender_closer = block_sender.closer();
Expand Down Expand Up @@ -349,7 +347,7 @@ impl HttpQuery {
let state = ExecuteStopped {
stats: Progresses::default(),
reason: Err(e.clone()),
stop_time: Instant::now(),
query_duration_ms: ctx_clone.get_query_duration_ms(),
affect: ctx_clone.get_affect(),
};
info!(
Expand Down Expand Up @@ -424,7 +422,7 @@ impl HttpQuery {
let state = self.state.read().await;
let (exe_state, err) = state.state.extract();
ResponseState {
running_time_ms: state.elapsed().as_secs_f64() * 1000.0,
running_time_ms: state.get_query_duration_ms(),
progresses: state.get_progress(),
state: exe_state,
error: err,
Expand Down
Loading

0 comments on commit 410324e

Please sign in to comment.