From 1e13f0d6fb9d2e268526c66414e5aa30554d252c Mon Sep 17 00:00:00 2001 From: amory Date: Tue, 30 Jul 2024 15:53:41 +0800 Subject: [PATCH] [fix](simdjson) fix simdjson reader will parse given root twice when data is object array (#38490) --- .../vec/exec/format/json/new_json_reader.cpp | 3 +- be/src/vec/exec/format/json/new_json_reader.h | 1 + .../stream_load/load_object_array_json.out | 4 + .../stream_load/test_json_object_array.csv | 1 + .../stream_load/load_object_array_json.groovy | 100 ++++++++++++++++++ 5 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 regression-test/data/load_p0/stream_load/load_object_array_json.out create mode 100644 regression-test/data/load_p0/stream_load/test_json_object_array.csv create mode 100644 regression-test/suites/load_p0/stream_load/load_object_array_json.groovy diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index f3dab071e73479..3b5affa6fb6fbc 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -1214,7 +1214,7 @@ Status NewJsonReader::_simdjson_handle_flat_array_complex_json_write_columns( while (true) { cur = (*_array_iter).get_object(); // extract root - if (!_parsed_json_root.empty()) { + if (!_parsed_from_json_root && !_parsed_json_root.empty()) { simdjson::ondemand::value val; Status st = JsonFunctions::extract_from_object(cur, _parsed_json_root, &val); if (UNLIKELY(!st.ok())) { @@ -1610,6 +1610,7 @@ Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_c fmt::format_to(error_msg, "{}", st.to_string()); return return_quality_error(error_msg, std::string((char*)_json_str, *size)); } + _parsed_from_json_root = true; } catch (simdjson::simdjson_error& e) { fmt::memory_buffer error_msg; fmt::format_to(error_msg, "Encounter error while extract_from_object, error: {}", diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index f044e06e62eaf2..0df3747b8c2a38 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -233,6 +233,7 @@ class NewJsonReader : public GenericReader { std::vector> _parsed_jsonpaths; std::vector _parsed_json_root; + bool _parsed_from_json_root = false; // to avoid parsing json root multiple times char _value_buffer[4 * 1024 * 1024]; // 4MB char _parse_buffer[512 * 1024]; // 512KB diff --git a/regression-test/data/load_p0/stream_load/load_object_array_json.out b/regression-test/data/load_p0/stream_load/load_object_array_json.out new file mode 100644 index 00000000000000..53598f37e7dd73 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/load_object_array_json.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +1021021338780262401 1021021338700570624 0 239.0000 876219500005C31942 0 0.0000 128.0000 1 2024-07-19T11:34:17 239.0000 0 0.0000 + diff --git a/regression-test/data/load_p0/stream_load/test_json_object_array.csv b/regression-test/data/load_p0/stream_load/test_json_object_array.csv new file mode 100644 index 00000000000000..63fc1b5d460e60 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_json_object_array.csv @@ -0,0 +1 @@ +{"data":[{"id":"1021021338780262401","type":"INSERT","owner_id":"0","amount_tag":"239.0","barcode":"876219500005C31942","retail_order_bill_id":"1021021338700570624","status":"0","amount_retail":"0.0","amount":"128.0","qty":"1","timestamp":"2024-07-19 11:34:17","price_cost":"239.0","is_gift":"0","amount_discount":"0.0"}],"type":"INSERT"} diff --git a/regression-test/suites/load_p0/stream_load/load_object_array_json.groovy b/regression-test/suites/load_p0/stream_load/load_object_array_json.groovy new file mode 100644 index 00000000000000..aeb7f9297ccecc --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/load_object_array_json.groovy @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("load_object_array_json", "p0") { + // define a sql table + def testTable = "load_object_array_json" + + def create_test_table = { + def result1 = sql """ + CREATE TABLE IF NOT EXISTS ${testTable} ( + `id` bigint(20) NOT NULL COMMENT "", + `retail_order_bill_id` bigint(20) NULL COMMENT "", + `owner_id` int(11) NULL COMMENT "", + `amount_tag` decimal(12, 4) NULL COMMENT "", + `barcode` varchar(128) NULL COMMENT "", + `status` int(11) NULL COMMENT "", + `amount_retail` decimal(12, 4) NULL COMMENT "", + `amount` decimal(12, 4) NULL COMMENT "", + `qty` int(11) NULL COMMENT "", + `timestamp` datetime NULL COMMENT "时间戳", + `price_cost` decimal(12, 4) NULL COMMENT "", + `is_gift` int(11) NULL COMMENT "", + `amount_discount` decimal(12, 4) NULL COMMENT "", + ) ENGINE=OLAP + UNIQUE KEY(`id`, `retail_order_bill_id`) + DISTRIBUTED BY HASH(`retail_order_bill_id`) BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "storage_format" = "V2" + ) + """ + } + + def load_data = {table_name, file_name -> + // load the json data + streamLoad { + table table_name + + // set http request header params + set 'strip_outer_array', 'true' + set 'read_json_by_line', 'true' + set 'format', 'json' + set 'columns', 'id,owner_id,amount_tag,barcode,retail_order_bill_id,status,amount_retail,amount,qty,timestamp,price_cost,is_gift,amount_discount' + set 'jsonpaths', '[\"$.id\",\"$.owner_id\",\"$.amount_tag\",\"$.barcode\",\"$.retail_order_bill_id\",\"$.status\",\"$.amount_retail\",\"$.amount\",\"$.qty\",\"$.timestamp\",\"$.price_cost\",\"$.is_gift\",\"$.amount_discount\"]' + set 'json_root', '$.data' + set 'fuzzy_parse', 'false' + set 'max_filter_ratio', '1' + file file_name // import json file + time 10000 // limit inflight 10s + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows + + json.NumberFilteredRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def check_data_correct = {table_name -> + sql "sync" + // select the table and check whether the data is correct + qt_select "select * from ${table_name} order by id" + } + + // case1: import array data in json format and enable vectorized engine + try { + sql "DROP TABLE IF EXISTS ${testTable}" + + create_test_table.call() + + load_data.call(testTable, 'test_json_object_array.csv') + + check_data_correct(testTable) + + } finally { + try_sql("DROP TABLE IF EXISTS ${testTable}") + } +}