diff --git a/Cargo.lock b/Cargo.lock index 31501e973f67..7a0176a2a87e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6139,6 +6139,7 @@ dependencies = [ "fixedbitset", "futures", "futures-async-stream", + "iana-time-zone", "itertools", "lazy_static", "madsim-tokio", diff --git a/src/common/src/session_config/mod.rs b/src/common/src/session_config/mod.rs index 2265e9317499..f24818f3020c 100644 --- a/src/common/src/session_config/mod.rs +++ b/src/common/src/session_config/mod.rs @@ -24,6 +24,7 @@ use derivative::{self, Derivative}; use itertools::Itertools; pub use query_mode::QueryMode; pub use search_path::{SearchPath, USER_NAME_WILD_CARD}; +use tracing::info; use crate::error::{ErrorCode, RwError}; use crate::session_config::transaction_isolation_level::IsolationLevel; @@ -357,6 +358,7 @@ pub struct ConfigMap { impl ConfigMap { pub fn set(&mut self, key: &str, val: Vec) -> Result<(), RwError> { + info!(%key, ?val, "set config"); let val = val.iter().map(AsRef::as_ref).collect_vec(); if key.eq_ignore_ascii_case(ImplicitFlush::entry_name()) { self.implicit_flush = val.as_slice().try_into()?; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 37f3b280a3e1..d4c8e2a194be 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -31,6 +31,7 @@ enum-as-inner = "0.5" fixedbitset = "0.4.1" futures = { version = "0.3", default-features = false, features = ["alloc"] } futures-async-stream = "0.2" +iana-time-zone = "0.1" itertools = "0.10" lazy_static = "1" maplit = "1" diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 4301a78c5129..550985a59167 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -28,6 +28,7 @@ use risingwave_common::error::{ErrorCode, Result}; use risingwave_sqlparser::ast::*; use self::util::DataChunkToRowSetAdapter; +use self::variable::handle_set_time_zone; use crate::scheduler::{DistributedQueryStream, LocalQueryStream}; use crate::session::SessionImpl; use crate::utils::WithOptions; @@ -351,6 +352,7 @@ pub async fn handle( variable, value, } => variable::handle_set(handler_args, variable, value), + Statement::SetTimeZone { local: _, value } => handle_set_time_zone(handler_args, value), Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await, Statement::CreateIndex { name, diff --git a/src/frontend/src/handler/variable.rs b/src/frontend/src/handler/variable.rs index a920c0963f1b..f88f27bcb0f2 100644 --- a/src/frontend/src/handler/variable.rs +++ b/src/frontend/src/handler/variable.rs @@ -16,9 +16,9 @@ use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Row; -use risingwave_common::error::Result; +use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; -use risingwave_sqlparser::ast::{Ident, SetVariableValue, Value}; +use risingwave_sqlparser::ast::{Ident, SetTimeZoneValue, SetVariableValue, Value}; use super::RwPgResponse; use crate::handler::HandlerArgs; @@ -48,6 +48,25 @@ pub fn handle_set( Ok(PgResponse::empty_result(StatementType::SET_VARIABLE)) } +pub(super) fn handle_set_time_zone( + handler_args: HandlerArgs, + value: SetTimeZoneValue, +) -> Result { + let tz_info = match value { + SetTimeZoneValue::Local => iana_time_zone::get_timezone() + .map_err(|e| ErrorCode::InternalError(format!("Failed to get local time zone: {}", e))), + SetTimeZoneValue::Default => Ok("UTC".to_string()), + SetTimeZoneValue::Ident(ident) => Ok(ident.real_value()), + SetTimeZoneValue::Literal(Value::DoubleQuotedString(s)) + | SetTimeZoneValue::Literal(Value::SingleQuotedString(s)) => Ok(s), + _ => Ok(value.to_string()), + }?; + + handler_args.session.set_config("timezone", vec![tz_info])?; + + Ok(PgResponse::empty_result(StatementType::SET_VARIABLE)) +} + pub(super) async fn handle_show( handler_args: HandlerArgs, variable: Vec, diff --git a/src/sqlparser/src/ast/mod.rs b/src/sqlparser/src/ast/mod.rs index d6330e02f285..52f591afba58 100644 --- a/src/sqlparser/src/ast/mod.rs +++ b/src/sqlparser/src/ast/mod.rs @@ -1041,6 +1041,11 @@ pub enum Statement { snapshot: Option, session: bool, }, + /// `SET [ SESSION | LOCAL ] TIME ZONE { value | 'value' | LOCAL | DEFAULT }` + SetTimeZone { + local: bool, + value: SetTimeZoneValue, + }, /// `COMMENT ON ...` /// /// Note: this is a PostgreSQL-specific statement. @@ -1450,6 +1455,14 @@ impl fmt::Display for Statement { } Ok(()) } + Statement::SetTimeZone { local, value } => { + write!(f, "SET")?; + if *local { + write!(f, " LOCAL")?; + } + write!(f, " TIME ZONE {}", value)?; + Ok(()) + } Statement::Commit { chain } => { write!(f, "COMMIT{}", if *chain { " AND CHAIN" } else { "" },) } @@ -1975,6 +1988,26 @@ impl fmt::Display for EmitMode { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum SetTimeZoneValue { + Ident(Ident), + Literal(Value), + Local, + Default, +} + +impl fmt::Display for SetTimeZoneValue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SetTimeZoneValue::Ident(ident) => write!(f, "{}", ident), + SetTimeZoneValue::Literal(value) => write!(f, "{}", value), + SetTimeZoneValue::Local => f.write_str("LOCAL"), + SetTimeZoneValue::Default => f.write_str("DEFAULT"), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum TransactionMode { diff --git a/src/sqlparser/src/parser.rs b/src/sqlparser/src/parser.rs index d41d31b85226..cd6c7099a6e9 100644 --- a/src/sqlparser/src/parser.rs +++ b/src/sqlparser/src/parser.rs @@ -3282,6 +3282,23 @@ impl Parser { pub fn parse_set(&mut self) -> Result { let modifier = self.parse_one_of_keywords(&[Keyword::SESSION, Keyword::LOCAL]); + if self.parse_keywords(&[Keyword::TIME, Keyword::ZONE]) { + let value = if self.parse_keyword(Keyword::DEFAULT) { + SetTimeZoneValue::Default + } else if self.parse_keyword(Keyword::LOCAL) { + SetTimeZoneValue::Local + } else if let Ok(ident) = self.parse_identifier() { + SetTimeZoneValue::Ident(ident) + } else { + let value = self.parse_value()?; + SetTimeZoneValue::Literal(value) + }; + + return Ok(Statement::SetTimeZone { + local: modifier == Some(Keyword::LOCAL), + value, + }); + } let variable = self.parse_identifier()?; if self.consume_token(&Token::Eq) || self.parse_keyword(Keyword::TO) { let mut values = vec![]; diff --git a/src/sqlparser/tests/testdata/select.yaml b/src/sqlparser/tests/testdata/select.yaml index b891b97c6edf..f60c44b083ff 100644 --- a/src/sqlparser/tests/testdata/select.yaml +++ b/src/sqlparser/tests/testdata/select.yaml @@ -78,4 +78,4 @@ Near "SELECT 1::int" - input: select id1, a1, id2, a2 from stream as S join version FOR SYSTEM_TIME AS OF NOW() AS V on id1= id2 formatted_sql: SELECT id1, a1, id2, a2 FROM stream AS S JOIN version FOR SYSTEM_TIME AS OF NOW() AS V ON id1 = id2 - formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "id1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "id2", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a2", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "stream", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "S", quote_style: None }, columns: [] }), for_system_time_as_of_now: false }, joins: [Join { relation: Table { name: ObjectName([Ident { value: "version", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "V", quote_style: None }, columns: [] }), for_system_time_as_of_now: true }, join_operator: Inner(On(BinaryOp { left: Identifier(Ident { value: "id1", quote_style: None }), op: Eq, right: Identifier(Ident { value: "id2", quote_style: None }) })) }] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' \ No newline at end of file + formatted_ast: 'Query(Query { with: None, body: Select(Select { distinct: All, projection: [UnnamedExpr(Identifier(Ident { value: "id1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a1", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "id2", quote_style: None })), UnnamedExpr(Identifier(Ident { value: "a2", quote_style: None }))], from: [TableWithJoins { relation: Table { name: ObjectName([Ident { value: "stream", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "S", quote_style: None }, columns: [] }), for_system_time_as_of_now: false }, joins: [Join { relation: Table { name: ObjectName([Ident { value: "version", quote_style: None }]), alias: Some(TableAlias { name: Ident { value: "V", quote_style: None }, columns: [] }), for_system_time_as_of_now: true }, join_operator: Inner(On(BinaryOp { left: Identifier(Ident { value: "id1", quote_style: None }), op: Eq, right: Identifier(Ident { value: "id2", quote_style: None }) })) }] }], lateral_views: [], selection: None, group_by: [], having: None }), order_by: [], limit: None, offset: None, fetch: None })' diff --git a/src/sqlparser/tests/testdata/set.yaml b/src/sqlparser/tests/testdata/set.yaml new file mode 100644 index 000000000000..d990a05e49e6 --- /dev/null +++ b/src/sqlparser/tests/testdata/set.yaml @@ -0,0 +1,17 @@ +# This file is automatically generated. See `src/sqlparser/test_runner/src/bin/apply.rs` for more information. +- input: SET TIME ZONE LOCAL + formatted_sql: SET TIME ZONE LOCAL +- input: SET TIME ZONE DEFAULT + formatted_sql: SET TIME ZONE DEFAULT +- input: SET TIME ZONE "Asia/Shanghai" + formatted_sql: SET TIME ZONE "Asia/Shanghai" +- input: SET TIME ZONE 'Asia/Shanghai' + error_msg: |- + sql parser error: Expected a value, found: EOF + Near "SET TIME ZONE 'Asia/Shanghai'" +- input: SET TIME ZONE "UTC" + formatted_sql: SET TIME ZONE "UTC" +- input: SET TIME ZONE UTC + formatted_sql: SET TIME ZONE UTC +- input: set time = '1'; + formatted_sql: SET time = '1'