Skip to content

Commit

Permalink
feat: influxql support show measurements (#795)
Browse files Browse the repository at this point in the history
* feat: influxql support show measurements

* fix CI
  • Loading branch information
jiacai2050 committed Mar 30, 2023
1 parent 1acedbb commit 1d8235c
Show file tree
Hide file tree
Showing 17 changed files with 110 additions and 19 deletions.
4 changes: 4 additions & 0 deletions integration_tests/cases/common/dml/case_sensitive.result
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,7 @@ DESC `CASE_SENSITIVE_TABLE1`;

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: DESC `CASE_SENSITIVE_TABLE1`;. Caused by: Failed to create plan, err:Table not found, table:CASE_SENSITIVE_TABLE1" })

DROP TABLE IF EXISTS case_SENSITIVE_table1;

affected_rows: 0

2 changes: 2 additions & 0 deletions integration_tests/cases/common/dml/case_sensitive.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,5 @@ DESC CASE_SENSITIVE_TABLE1;
DESC `case_SENSITIVE_table1`;

DESC `CASE_SENSITIVE_TABLE1`;

DROP TABLE IF EXISTS case_SENSITIVE_table1;
4 changes: 4 additions & 0 deletions integration_tests/cases/common/dml/insert_mode.result
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,7 @@ UInt64(0),Timestamp(2),UInt32(20),String("123"),UInt32(21),UInt32(22),UInt32(4),
UInt64(0),Timestamp(3),UInt32(30),String("123"),UInt32(31),UInt32(32),UInt32(5),


DROP TABLE IF EXISTS `03_dml_insert_mode_table4`;

affected_rows: 0

3 changes: 2 additions & 1 deletion integration_tests/cases/common/dml/insert_mode.sql
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,5 @@ FROM
ORDER BY
`c1` ASC;

DROP TABLE `03_dml_insert_mode_table4`;

DROP TABLE IF EXISTS `03_dml_insert_mode_table4`;
8 changes: 8 additions & 0 deletions integration_tests/cases/common/dml/issue-302.result
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
DROP TABLE IF EXISTS issue302;

affected_rows: 0

CREATE TABLE `issue302` (`name` string TAG NULL, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE=Analytic with (enable_ttl='false');

affected_rows: 0
Expand All @@ -12,3 +16,7 @@ issue302.t,COUNT(DISTINCT issue302.name),
Timestamp(1651737067000),Int64(0),


DROP TABLE IF EXISTS issue302;

affected_rows: 0

4 changes: 4 additions & 0 deletions integration_tests/cases/common/dml/issue-302.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
DROP TABLE IF EXISTS issue302;

CREATE TABLE `issue302` (`name` string TAG NULL, `value` double NOT NULL, `t` timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE=Analytic with (enable_ttl='false');

INSERT INTO issue302(t, value) VALUES(1651737067000, 100);

select `t`, count(distinct name) from issue302 group by `t`;

DROP TABLE IF EXISTS issue302;
4 changes: 4 additions & 0 deletions integration_tests/cases/common/dml/issue-59.result
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ String("logical_plan"),String("Projection: issue59.id + Int64(1), COUNT(DISTINCT
String("physical_plan"),String("ProjectionExec: expr=[issue59.id + Int64(1)@0 as issue59.id + Int64(1), COUNT(DISTINCT issue59.account)@1 as COUNT(DISTINCT issue59.account)]\n ProjectionExec: expr=[group_alias_0@0 as issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], aggr=[COUNT(alias1)]\n AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0, alias1@1 as alias1], aggr=[]\n CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec: partitioning=Hash([Column { name: \"group_alias_0\", index: 0 }, Column { name: \"alias1\", index: 1 }], 8), input_partitions=8\n AggregateExec: mode=Partial, gby=[CAST(id@0 AS Int64) + 1 as group_alias_0, account@1 as alias1], aggr=[]\n ScanTable: table=issue59, parallelism=8, order=None, \n"),


DROP TABLE IF EXISTS issue59;

affected_rows: 0

2 changes: 2 additions & 0 deletions integration_tests/cases/common/dml/issue-59.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,5 @@ GROUP BY id+1;
explain SELECT id+1, count(distinct(account))
FROM issue59
GROUP BY id+1;

DROP TABLE IF EXISTS issue59;
8 changes: 8 additions & 0 deletions integration_tests/cases/common/dml/issue-637.result
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,11 @@ tsid,t,double_filed,float_filed,str_field,var_field,u64_field,u32_field,u16_fiel
UInt64(0),Timestamp(1651737067000),Double(100.0),Float(100.0),String("s"),Varbinary([118]),UInt64(100),UInt32(100),UInt16(100),UInt8(100),Int64(100),Int32(100),Int16(100),Int8(100),Boolean(false),


DROP TABLE IF EXISTS issue637;

affected_rows: 0

DROP TABLE IF EXISTS issue637_1;

affected_rows: 0

4 changes: 4 additions & 0 deletions integration_tests/cases/common/dml/issue-637.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ VALUES
(1651737067000,100,100,"s","v",100,100,100,100,100,100,100,100,false);

SELECT * FROM `issue637_1`;

DROP TABLE IF EXISTS issue637;

DROP TABLE IF EXISTS issue637_1;
5 changes: 5 additions & 0 deletions integration_tests/cases/env/local/influxql/basic.result
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ SELECT "level_description", location, water_level FROM "h2o_feet" where location

{"results":[{"statement_id":0,"series":[{"name":"h2o_feet","columns":["time","level_description","location","water_level"],"values":[[1439827200000,"below 3 feet","santa_monica",2.064],[1439827560000,"below 3 feet","santa_monica",2.116],[1439827620000,"below 3 feet","santa_monica",2.028]]}]}]}

-- SQLNESS ARG protocol=influxql
show measurements;

{"results":[{"statement_id":0,"series":[{"name":"measurements","columns":["name"],"values":[["h2o_feet"]]}]}]}

DROP TABLE IF EXISTS `h2o_feet`;

affected_rows: 0
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/cases/env/local/influxql/basic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ SELECT * FROM "h2o_feet";
-- SQLNESS ARG protocol=influxql
SELECT "level_description", location, water_level FROM "h2o_feet" where location = 'santa_monica';

-- SQLNESS ARG protocol=influxql
show measurements;

DROP TABLE IF EXISTS `h2o_feet`;
42 changes: 31 additions & 11 deletions interpreters/src/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use regex::Regex;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use sql::{
ast::ShowCreateObject,
plan::{ShowCreatePlan, ShowPlan, ShowTablesPlan},
plan::{QueryType, ShowCreatePlan, ShowPlan, ShowTablesPlan},
};

use crate::{
Expand Down Expand Up @@ -127,17 +127,37 @@ impl ShowInterpreter {
.map(|t| t.name().to_string())
.collect::<Vec<_>>(),
};
let schema = DataSchema::new(vec![Field::new(
SHOW_TABLES_COLUMN_SCHEMA,
DataType::Utf8,
false,
)]);
let record_batch = RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(StringArray::from(tables_names))],
)
.context(CreateRecordBatch)?;

let record_batch = match plan.query_type {
QueryType::Sql => {
let schema = DataSchema::new(vec![Field::new(
SHOW_TABLES_COLUMN_SCHEMA,
DataType::Utf8,
false,
)]);

RecordBatch::try_new(
Arc::new(schema),
vec![Arc::new(StringArray::from(tables_names))],
)
.context(CreateRecordBatch)?
}
QueryType::InfluxQL => {
// TODO: refactor those constants
let schema = DataSchema::new(vec![
Field::new("ceresdb::measurement", DataType::Utf8, false),
Field::new("name", DataType::Utf8, false),
]);

let measurements = vec!["measurements".to_string(); tables_names.len()];
let measurements = Arc::new(StringArray::from(measurements));
RecordBatch::try_new(
Arc::new(schema),
vec![measurements, Arc::new(StringArray::from(tables_names))],
)
.context(CreateRecordBatch)?
}
};
let record_batch = record_batch.try_into().context(ToCommonRecordType)?;

Ok(Output::Records(vec![record_batch]))
Expand Down
6 changes: 4 additions & 2 deletions server/src/handlers/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl From<&str> for Precision {
/// Query string parameters for query api(by influxql)
///
/// It's derived from query string parameters of query described in
/// doc of influxdb 1.8:
/// doc of influxdb 1.8:
/// https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-string-parameters-1
///
/// NOTE:
Expand Down Expand Up @@ -311,6 +311,8 @@ impl InfluxqlResultBuilder {
}
);

// Query like `show measurements`, there will be not timestamp.
let has_timestamp = column_schemas.iter().any(|c| c.data_type.is_timestamp());
// Find the tags part and columns part from schema.
let mut group_by_col_idxs = Vec::new();
let mut value_col_idxs = Vec::new();
Expand All @@ -324,7 +326,7 @@ impl InfluxqlResultBuilder {
});

// The group by tags will be placed after measurement and before time column.
let mut searching_group_by_tags = true;
let mut searching_group_by_tags = has_timestamp;
for (idx, col) in col_iter {
if col.data_type.is_timestamp() {
searching_group_by_tags = false;
Expand Down
18 changes: 14 additions & 4 deletions sql/src/influxql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ use influxql_logical_planner::planner::InfluxQLToLogicalPlan;
use influxql_parser::{
common::{MeasurementName, QualifiedMeasurementName},
select::{MeasurementSelection, SelectStatement},
show_measurements::ShowMeasurementsStatement,
statement::Statement as InfluxqlStatement,
};
use snafu::{ensure, ResultExt};

use crate::{
influxql::{error::*, provider::InfluxSchemaProviderImpl},
plan::{Plan, QueryPlan},
plan::{Plan, QueryPlan, QueryType, ShowPlan, ShowTablesPlan},
provider::{ContextProviderAdapter, MetaProvider},
};

Expand All @@ -36,15 +37,15 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
/// the [InfluxqlStatement] will be converted to [SqlStatement] first,
/// and build plan then.
pub fn statement_to_plan(self, stmt: InfluxqlStatement) -> Result<Plan> {
match &stmt {
match stmt {
InfluxqlStatement::Select(_) => self.select_to_plan(stmt),
InfluxqlStatement::ShowMeasurements(stmt) => self.show_measurements_to_plan(*stmt),
InfluxqlStatement::CreateDatabase(_)
| InfluxqlStatement::ShowDatabases(_)
| InfluxqlStatement::ShowRetentionPolicies(_)
| InfluxqlStatement::ShowTagKeys(_)
| InfluxqlStatement::ShowTagValues(_)
| InfluxqlStatement::ShowFieldKeys(_)
| InfluxqlStatement::ShowMeasurements(_)
| InfluxqlStatement::Delete(_)
| InfluxqlStatement::DropMeasurement(_)
| InfluxqlStatement::Explain(_) => Unimplemented {
Expand All @@ -54,7 +55,16 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
}
}

pub fn select_to_plan(self, stmt: InfluxqlStatement) -> Result<Plan> {
// TODO: support offset/limit/match in stmt
fn show_measurements_to_plan(self, _stmt: ShowMeasurementsStatement) -> Result<Plan> {
let plan = ShowTablesPlan {
pattern: None,
query_type: QueryType::InfluxQL,
};
Ok(Plan::Show(ShowPlan::ShowTablesPlan(plan)))
}

fn select_to_plan(self, stmt: InfluxqlStatement) -> Result<Plan> {
if let InfluxqlStatement::Select(select_stmt) = &stmt {
check_select_statement(select_stmt)?;
} else {
Expand Down
7 changes: 7 additions & 0 deletions sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,17 @@ pub struct ShowCreatePlan {
pub obj_type: ShowCreateObject,
}

#[derive(Debug, PartialEq, Eq)]
pub enum QueryType {
Sql,
InfluxQL,
}

#[derive(Debug, PartialEq, Eq)]
pub struct ShowTablesPlan {
/// Like pattern
pub pattern: Option<String>,
pub query_type: QueryType,
}

#[derive(Debug)]
Expand Down
5 changes: 4 additions & 1 deletion sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ use crate::{
partition::PartitionParser,
plan::{
AlterTableOperation, AlterTablePlan, CreateTablePlan, DescribeTablePlan, DropTablePlan,
ExistsTablePlan, InsertPlan, Plan, QueryPlan, ShowCreatePlan, ShowPlan, ShowTablesPlan,
ExistsTablePlan, InsertPlan, Plan, QueryPlan, QueryType, ShowCreatePlan, ShowPlan,
ShowTablesPlan,
},
promql::{ColumnNames, Expr as PromExpr},
provider::{ContextProviderAdapter, MetaProvider},
Expand Down Expand Up @@ -992,6 +993,7 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> {
fn show_tables_to_plan(&self, show_tables: ShowTables) -> Result<Plan> {
let plan = ShowTablesPlan {
pattern: show_tables.pattern,
query_type: QueryType::Sql,
};
Ok(Plan::Show(ShowPlan::ShowTablesPlan(plan)))
}
Expand Down Expand Up @@ -2193,6 +2195,7 @@ mod tests {
ShowTablesPlan(
ShowTablesPlan {
pattern: None,
query_type: Sql,
},
),
)"#,
Expand Down

0 comments on commit 1d8235c

Please sign in to comment.