Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pipeline dissert error is returned directly to the user, instead of printing a warn log #4709

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod value;

use ahash::HashSet;
use common_telemetry::debug;
use itertools::{merge, Itertools};
use itertools::Itertools;
use processor::{Processor, ProcessorBuilder, Processors};
use transform::{TransformBuilders, Transformer, Transforms};
use value::Value;
Expand Down Expand Up @@ -91,13 +91,18 @@ where
debug!("required_keys: {:?}", required_keys);

// intermediate keys are the keys that all processor and transformer required
let ordered_intermediate_keys: Vec<String> =
merge(processors_required_keys, transforms_required_keys)
.cloned()
.collect::<HashSet<String>>()
.into_iter()
.sorted()
.collect();
let ordered_intermediate_keys: Vec<String> = [
processors_required_keys,
transforms_required_keys,
processors_output_keys,
]
.iter()
.flat_map(|l| l.iter())
.collect::<HashSet<&String>>()
.into_iter()
.sorted()
.cloned()
.collect_vec();

let mut final_intermediate_keys = Vec::with_capacity(ordered_intermediate_keys.len());
let mut intermediate_keys_exclude_original =
Expand Down
14 changes: 5 additions & 9 deletions src/pipeline/src/etl/processor/dissect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,16 +817,12 @@ impl Processor for DissectProcessor {
for field in self.fields.iter() {
let index = field.input_index();
match val.get(index) {
Some(Value::String(val_str)) => match self.process(val_str) {
Ok(r) => {
for (k, v) in r {
val[k] = v;
}
}
Err(e) => {
warn!("dissect processor: {}", e);
Some(Value::String(val_str)) => {
let r = self.process(val_str)?;
for (k, v) in r {
val[k] = v;
}
},
}
Some(Value::Null) | None => {
if !self.ignore_missing {
return Err(format!(
Expand Down
34 changes: 34 additions & 0 deletions src/pipeline/tests/dissect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,37 @@ transform:
Some(StringValue("key1_key2".to_string()))
);
}

#[test]
fn test_parse_failure() {
let input_str = r#"
{
"str": "key1 key2"
}"#;

let pipeline_yaml = r#"
processors:
- dissect:
field: str
patterns:
- "%{key1} %{key2} %{key3}"

transform:
- fields:
- key1
type: string
"#;

let input_value = serde_json::from_str::<serde_json::Value>(input_str).unwrap();

let yaml_content = pipeline::Content::Yaml(pipeline_yaml.into());
let pipeline: pipeline::Pipeline<pipeline::GreptimeTransformer> =
pipeline::parse(&yaml_content).expect("failed to parse pipeline");
let mut result = pipeline.init_intermediate_state();

pipeline.prepare(input_value, &mut result).unwrap();
let row = pipeline.exec_mut(&mut result);

assert!(row.is_err());
assert_eq!(row.err().unwrap(), "No matching pattern found");
}
46 changes: 46 additions & 0 deletions src/pipeline/tests/regex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,49 @@ transform:

assert_eq!(output.rows[0].values[0].value_data, None);
}

#[test]
fn test_unuse_regex_group() {
let input_value_str = r#"
[
{
"str": "123 456"
}
]
"#;

let pipeline_yaml = r#"
processors:
- regex:
fields:
- str
pattern: "(?<id1>\\d+) (?<id2>\\d+)"

transform:
- field: str_id1
type: string
"#;

let output = common::parse_and_exec(input_value_str, pipeline_yaml);

assert_eq!(
output.schema,
vec![
common::make_column_schema(
"str_id1".to_string(),
ColumnDataType::String,
SemanticType::Field,
),
common::make_column_schema(
"greptime_timestamp".to_string(),
ColumnDataType::TimestampNanosecond,
SemanticType::Timestamp,
),
]
);

assert_eq!(
output.rows[0].values[0].value_data,
Some(StringValue("123".to_string()))
);
}
Loading