Skip to content

Commit

Permalink
Merge branch 'main' into bz2/bump-prepare
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] committed Oct 28, 2022
2 parents 462a091 + cee324d commit 70665fa
Show file tree
Hide file tree
Showing 32 changed files with 1,123 additions and 501 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/doc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
- name: Install dependencies
run: sudo apt install make build-essential cmake protobuf-compiler curl openssl libssl-dev libsasl2-dev libcurl4-openssl-dev pkg-config postgresql-client tmux lld
run: sudo apt-get update && sudo apt-get install -y make build-essential cmake protobuf-compiler curl openssl libssl-dev libsasl2-dev libcurl4-openssl-dev pkg-config postgresql-client tmux lld
- name: Docs
run: |
RUSTDOCFLAGS="--markdown-css rust.css --markdown-no-toc --index-page docs/index.md -Zunstable-options" cargo doc --workspace --no-deps --document-private-items
Expand Down
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/plugins/upload-failure-logs/hooks/post-command
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
set -euo pipefail

if [ $BUILDKITE_COMMAND_EXIT_STATUS -ne 0 ]; then
mkdir risedev-logs && cp .risingwave/log/* risedev-logs
mv .risingwave/log risedev-logs
zip -q -r risedev-logs.zip risedev-logs/
buildkite-agent artifact upload "risedev-logs/*"
buildkite-agent artifact upload risedev-logs.zip
Expand Down
20 changes: 11 additions & 9 deletions ci/scripts/deterministic-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,28 @@ echo "--- Download artifacts"
buildkite-agent artifact download risingwave_simulation .
chmod +x ./risingwave_simulation

export RUNNER=./risingwave_simulation
export RUST_LOG=off
export RUST_LOG=info
export LOGDIR=.risingwave/log

mkdir -p $LOGDIR

echo "--- deterministic simulation e2e, ci-3cn-1fe, ddl"
seq 16 | parallel MADSIM_TEST_SEED={} $RUNNER './e2e_test/ddl/\*\*/\*.slt'
seq 16 | parallel MADSIM_TEST_SEED={} './risingwave_simulation ./e2e_test/ddl/\*\*/\*.slt > $LOGDIR/ddl-{}.log && rm $LOGDIR/ddl-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-1fe, streaming"
seq 16 | parallel MADSIM_TEST_SEED={} $RUNNER './e2e_test/streaming/\*\*/\*.slt'
seq 16 | parallel MADSIM_TEST_SEED={} './risingwave_simulation ./e2e_test/streaming/\*\*/\*.slt > $LOGDIR/streaming-{}.log && rm $LOGDIR/streaming-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-1fe, batch"
seq 16 | parallel MADSIM_TEST_SEED={} $RUNNER './e2e_test/batch/\*\*/\*.slt'
seq 16 | parallel MADSIM_TEST_SEED={} './risingwave_simulation ./e2e_test/batch/\*\*/\*.slt > $LOGDIR/batch-{}.log && rm $LOGDIR/batch-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-1fe, kafka source"
seq 16 | parallel MADSIM_TEST_SEED={} $RUNNER './e2e_test/source/kafka.slt'
seq 16 | parallel MADSIM_TEST_SEED={} './risingwave_simulation ./e2e_test/source/kafka.slt > $LOGDIR/source-{}.log && rm $LOGDIR/source-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, streaming"
seq 16 | parallel MADSIM_TEST_SEED={} $RUNNER -j 16 './e2e_test/streaming/\*\*/\*.slt'
seq 16 | parallel MADSIM_TEST_SEED={} './risingwave_simulation -j 16 ./e2e_test/streaming/\*\*/\*.slt > $LOGDIR/parallel-streaming-{}.log && rm $LOGDIR/parallel-streaming-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, batch"
seq 16 | parallel MADSIM_TEST_SEED={} $RUNNER -j 16 './e2e_test/batch/\*\*/\*.slt'
seq 16 | parallel MADSIM_TEST_SEED={} './risingwave_simulation -j 16 ./e2e_test/batch/\*\*/\*.slt > $LOGDIR/parallel-batch-{}.log && rm $LOGDIR/parallel-batch-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-1fe, fuzzing"
seq 16 | parallel MADSIM_TEST_SEED={} $RUNNER --sqlsmith 100 ./src/tests/sqlsmith/tests/testdata
seq 16 | parallel MADSIM_TEST_SEED={} './risingwave_simulation --sqlsmith 100 ./src/tests/sqlsmith/tests/testdata > $LOGDIR/fuzzing-{}.log && rm $LOGDIR/fuzzing-{}.log'
14 changes: 6 additions & 8 deletions ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ echo "--- Download artifacts"
buildkite-agent artifact download risingwave_simulation .
chmod +x ./risingwave_simulation

export RUNNER=./risingwave_simulation
export RUST_LOG=off
export RUST_LOG=info
export LOGDIR=.risingwave/log

mkdir -p $LOGDIR

# bugs here! Tracking issue https://github.com/risingwavelabs/risingwave/issues/4527
echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, streaming"
seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-meta --kill-frontend --kill-compute './e2e_test/streaming/\*\*/\*.slt'
seq 1 | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill-meta --kill-frontend --kill-compute --kill-compactor ./e2e_test/streaming/\*\*/\*.slt > $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log'

# bugs here! Tracking issue https://github.com/risingwavelabs/risingwave/issues/4527
echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, batch"
seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --kill-meta --kill-frontend --kill-compute './e2e_test/batch/\*\*/\*.slt'

# bugs here! Tracking issue https://github.com/risingwavelabs/risingwave/issues/5103
echo "--- deterministic simulation e2e, ci-3cn-1fe, recovery, streaming"
seq 1 | parallel MADSIM_TEST_SEED={} $RUNNER --etcd-timeout-rate=0.01 './e2e_test/streaming/\*\*/\*.slt'
seq 1 | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kill-meta --kill-frontend --kill-compute --kill-compactor ./e2e_test/batch/\*\*/\*.slt > $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log'
10 changes: 6 additions & 4 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,27 +125,29 @@ steps:
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
command: "timeout 8m ci/scripts/deterministic-e2e-test.sh"
command: "timeout 14m ci/scripts/deterministic-e2e-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
- docker-compose#v3.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
retry: *auto-retry

- label: "recovery test (deterministic simulation)"
command: "timeout 8m ci/scripts/deterministic-recovery-test.sh"
command: "timeout 14m ci/scripts/deterministic-recovery-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
- docker-compose#v3.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
retry: *auto-retry
soft_fail: true

Expand Down
49 changes: 0 additions & 49 deletions ci/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,6 @@ steps:
timeout_in_minutes: 20
retry: *auto-retry

- label: "build (deterministic simulation)"
command: "ci/scripts/build-simulation.sh"
key: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
- docker-compose#v3.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
retry: *auto-retry

- label: "docslt"
command: "ci/scripts/docslt.sh"
key: "docslt"
Expand Down Expand Up @@ -117,43 +105,6 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
command: "timeout 8m ci/scripts/deterministic-e2e-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
- docker-compose#v3.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
retry: *auto-retry

- label: "scaling test (deterministic simulation)"
command: "MADSIM_TEST_NUM=5 ci/scripts/deterministic-scale-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
- docker-compose#v3.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
retry: *auto-retry

- label: "recovery test (deterministic simulation)"
command: "timeout 8m ci/scripts/deterministic-recovery-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
- docker-compose#v3.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
retry: *auto-retry
soft_fail: true

- label: "release"
command: "ci/scripts/release.sh"
if: build.tag != null
Expand Down
10 changes: 6 additions & 4 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,20 @@ steps:
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
command: "timeout 8m ci/scripts/deterministic-e2e-test.sh"
command: "timeout 14m ci/scripts/deterministic-e2e-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
- docker-compose#v3.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 10
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
retry: *auto-retry

- label: "recovery test (deterministic simulation)"
command: "timeout 8m ci/scripts/deterministic-recovery-test.sh"
command: "timeout 14m ci/scripts/deterministic-recovery-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
Expand All @@ -212,7 +213,8 @@ steps:
# - test-collector#v1.0.0:
# files: "*-junit.xml"
# format: "junit"
timeout_in_minutes: 10
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 15
retry: *auto-retry
soft_fail: true

Expand Down
35 changes: 35 additions & 0 deletions e2e_test/source/kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,32 @@ WITH (
scan.startup.mode = 'earliest')
ROW format JSON;

# we cannot create debezium source without pk
statement error
create materialized source s13 (
id integer,
name varchar,
is_adult integer,
birthday timestamp
) with (
connector = 'kafka',
topic = 'maxwell_json',
properties.bootstrap.server = '127.0.0.1:29092'
) row format maxwell;

statement ok
create materialized source s13 (
id integer,
name varchar,
is_adult integer,
birthday timestamp,
PRIMARY KEY (id)
) with (
connector = 'kafka',
topic = 'maxwell_json',
properties.bootstrap.server = '127.0.0.1:29092'
) row format maxwell;

statement ok
flush;

Expand Down Expand Up @@ -312,6 +338,12 @@ select id, code, timestamp, xfas, contacts from s12;
----
100 abc 1473305798 {"(0,200,10.0.0.1)","(1,400,10.0.0.2)"} ({1xxx,2xxx},{1xxx,2xxx})

query ITIT
select * from s13 order by id;
----
1 tom 0 2017-12-31 16:00:01
2 chi 1 1999-12-31 16:00:01

statement ok
drop materialized view source_mv1

Expand All @@ -335,3 +367,6 @@ drop source s11

statement ok
drop source s12

statement ok
drop source s13
3 changes: 3 additions & 0 deletions scripts/source/test_data/maxwell_json.1
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"database":"test","table":"t","type":"insert","ts":1666937996,"xid":1171,"commit":true,"data":{"id":1,"name":"tom","is_adult":0,"birthday":"2017-12-31 16:00:01"}}
{"database":"test","table":"t","type":"insert","ts":1666938023,"xid":1254,"commit":true,"data":{"id":2,"name":"alex","is_adult":1,"birthday":"1999-12-31 16:00:01"}}
{"database":"test","table":"t","type":"update","ts":1666938068,"xid":1373,"commit":true,"data":{"id":2,"name":"chi","is_adult":1,"birthday":"1999-12-31 16:00:01"},"old":{"name":"alex"}}
7 changes: 7 additions & 0 deletions src/frontend/planner_test/tests/testdata/basic_query.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,10 @@
stream_plan: |
StreamMaterialize { columns: [a, b, mv.t._row_id(hidden)], pk_columns: [mv.t._row_id] }
└─StreamTableScan { table: mv, columns: [mv.a, mv.b, mv.t._row_id], pk: [mv.t._row_id], dist: UpstreamHashShard(mv.t._row_id) }
- sql: |
create table t (v1 int, v2 int);
create materialized view mv(a,b) as select v1+1,v2+1 from t;
select * from mv;
stream_plan: |
StreamMaterialize { columns: [a, b, mv.t._row_id(hidden)], pk_columns: [mv.t._row_id] }
└─StreamTableScan { table: mv, columns: [mv.a, mv.b, mv.t._row_id], pk: [mv.t._row_id], dist: UpstreamHashShard(mv.t._row_id) }
43 changes: 32 additions & 11 deletions src/frontend/src/handler/create_mv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::catalog::DEFAULT_SCHEMA_NAME;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::catalog::Table as ProstTable;
use risingwave_pb::user::grant_privilege::{Action, Object};
Expand Down Expand Up @@ -67,15 +68,27 @@ pub fn gen_create_mv_plan(

let definition = query.to_string();

// If columns is empty, it means that the user did not specify the column names.
// In this case, we extract the column names from the query.
// If columns is not empty, it means that user specify the column names and the user
// should guarantee that the column names number are consistent with the query.
let col_names: Option<Vec<String>> = if columns.is_empty() {
None
} else {
Some(columns.iter().map(|v| v.value.clone()).collect())
};

let bound = {
let mut binder = Binder::new(session);
binder.bind_query(query)?
};

if let BoundSetExpr::Select(select) = &bound.body {
// `InputRef`'s alias will be implicitly assigned in `bind_project`.
// For other expressions, we require the user to explicitly assign an alias.
if select.aliases.iter().any(Option::is_none) {
// If user provide columns name (col_names.is_some()), we don't need alias.
// For other expressions (col_names.is_none()), we require the user to explicitly assign an
// alias.
if col_names.is_none() && select.aliases.iter().any(Option::is_none) {
return Err(ErrorCode::BindError(
"An alias must be specified for an expression".to_string(),
)
Expand All @@ -87,17 +100,25 @@ pub fn gen_create_mv_plan(
check_privileges(session, &check_items)?;
}
}
// If columns is empty, it means that the user did not specify the column names.
// In this case, we extract the column names from the query.
// If columns is not empty, it means that user specify the column names and the user
// should guarantee that the column names number are consistent with the query.
let col_names = if columns.is_empty() {
None
} else {
Some(columns.iter().map(|v| v.value.clone()).collect())
};

let mut plan_root = Planner::new(context).plan_query(bound)?;
// Check the col_names match number of columns in the query.
if let Some(col_names) = &col_names {
// calculate the number of unhidden columns
let unhidden_len = plan_root
.schema()
.fields()
.iter()
.enumerate()
.filter(|(i, _)| plan_root.out_fields().contains(*i))
.count();
if col_names.len() != unhidden_len {
return Err(InternalError(
"number of column names does not match number of columns".to_string(),
)
.into());
}
}
let materialize = plan_root.gen_create_mv_plan(table_name, definition, col_names)?;
let mut table = materialize.table().to_prost(schema_id, database_id);
if session.config().get_create_compaction_group_for_mv() {
Expand Down
Loading

0 comments on commit 70665fa

Please sign in to comment.