diff --git a/Cargo.lock b/Cargo.lock index d16f6a9c81de..172846c4c07f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1759,6 +1759,18 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b015497079b9a9d69c02ad25de6c0a6edef051ea6360a327d0bd05802ef64ad" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + [[package]] name = "csv-core" version = "0.1.10" @@ -5996,7 +6008,7 @@ dependencies = [ "byteorder", "bytes", "chrono", - "csv-core", + "csv", "duration-str", "enum-as-inner", "futures", diff --git a/ci/scripts/s3-source-test.sh b/ci/scripts/s3-source-test.sh index 58c2cbd93863..4d482568cad2 100755 --- a/ci/scripts/s3-source-test.sh +++ b/ci/scripts/s3-source-test.sh @@ -4,11 +4,14 @@ set -euo pipefail source ci/scripts/common.env.sh -while getopts 'p:' opt; do +while getopts 'p:s:' opt; do case ${opt} in p ) profile=$OPTARG ;; + s ) + script=$OPTARG + ;; \? ) echo "Invalid Option: -$OPTARG" 1>&2 exit 1 @@ -20,6 +23,8 @@ while getopts 'p:' opt; do done shift $((OPTIND -1)) + + echo "--- Download artifacts" mkdir -p target/debug buildkite-agent artifact download risingwave-"$profile" target/debug/ @@ -44,7 +49,7 @@ cargo make ci-start ci-1cn-1fe echo "--- Run test" python3 -m pip install minio psycopg2-binary -python3 e2e_test/s3/run.py +python3 e2e_test/s3/$script.py echo "--- Kill cluster" cargo make ci-kill diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index f4003837f3f6..5a139ee74d81 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -184,8 +184,8 @@ steps: timeout_in_minutes: 5 retry: *auto-retry - - label: "S3 source check on AWS" - command: "ci/scripts/s3-source-test.sh -p ci-release" + - label: "S3 source check on AWS (json parser)" + command: "ci/scripts/s3-source-test.sh -p ci-release -s run" depends_on: build plugins: - seek-oss/aws-sm#v2.3.1: @@ -200,8 +200,8 @@ steps: timeout_in_minutes: 20 retry: *auto-retry - - label: "S3 source check on lyvecloud.seagate.com" - command: "ci/scripts/s3-source-test.sh -p ci-release" + - label: "S3 source check on lyvecloud.seagate.com (json parser)" + command: "ci/scripts/s3-source-test.sh -p ci-release -s run" depends_on: build plugins: - seek-oss/aws-sm#v2.3.1: @@ -215,3 +215,18 @@ steps: - S3_SOURCE_TEST_CONF timeout_in_minutes: 20 retry: *auto-retry + - label: "S3 source check on AWS (csv parser)" + command: "ci/scripts/s3-source-test.sh -p ci-release -s run_csv" + depends_on: build + plugins: + - seek-oss/aws-sm#v2.3.1: + env: + S3_SOURCE_TEST_CONF: ci_s3_source_test_aws + - docker-compose#v4.9.0: + run: rw-build-env + config: ci/docker-compose.yml + mount-buildkite-agent: true + environment: + - S3_SOURCE_TEST_CONF + timeout_in_minutes: 20 + retry: *auto-retry \ No newline at end of file diff --git a/e2e_test/s3/run_csv.py b/e2e_test/s3/run_csv.py new file mode 100644 index 000000000000..c5412c1e57d0 --- /dev/null +++ b/e2e_test/s3/run_csv.py @@ -0,0 +1,155 @@ +import os +import string +import json +import string +from time import sleep +from minio import Minio +import psycopg2 +import random + + +def do_test(config, N, n, prefix): + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + # Open a cursor to execute SQL statements + cur = conn.cursor() + cur.execute(f'''CREATE TABLE s3_test_csv_without_headers( + a int, + b int, + c int, + ) WITH ( + connector = 's3', + match_pattern = '{prefix}_data_without_headers.csv', + s3.region_name = '{config['S3_REGION']}', + s3.bucket_name = '{config['S3_BUCKET']}', + s3.credentials.access = '{config['S3_ACCESS_KEY']}', + s3.credentials.secret = '{config['S3_SECRET_KEY']}', + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + ) ROW FORMAT CSV WITHOUT HEADER DELIMITED BY ',';''') + + cur.execute(f'''CREATE TABLE s3_test_csv_with_headers( + a int, + b int, + c int, + ) WITH ( + connector = 's3', + match_pattern = '{prefix}_data_with_headers.csv', + s3.region_name = '{config['S3_REGION']}', + s3.bucket_name = '{config['S3_BUCKET']}', + s3.credentials.access = '{config['S3_ACCESS_KEY']}', + s3.credentials.secret = '{config['S3_SECRET_KEY']}', + s3.endpoint_url = 'https://{config['S3_ENDPOINT']}' + ) ROW FORMAT CSV DELIMITED BY ',';''') + + total_row = int(N * n) + sleep(60) + while True: + sleep(60) + cur.execute('select count(*) from s3_test_csv_with_headers') + result_with_headers = cur.fetchone() + cur.execute('select count(*) from s3_test_csv_without_headers') + result_without_headers = cur.fetchone() + if result_with_headers[0] == total_row and result_without_headers[0] == total_row: + break + print( + f"Now got {result_with_headers[0]} rows in table, {total_row} expected, wait 60s") + + cur.execute( + 'select count(*), sum(a), sum(b), sum(c) from s3_test_csv_with_headers') + result_with_headers = cur.fetchone() + + cur.execute( + 'select count(*), sum(a), sum(b), sum(c) from s3_test_csv_without_headers') + s3_test_csv_without_headers = cur.fetchone() + + print(result_with_headers, s3_test_csv_without_headers, + int(((N - 1) * N / 2) * n), int(N*n / 2)) + + assert s3_test_csv_without_headers[0] == total_row + assert s3_test_csv_without_headers[1] == int(((N - 1) * N / 2) * n) + assert s3_test_csv_without_headers[2] == int(N*n / 2) + assert s3_test_csv_without_headers[3] == 0 + + assert result_with_headers[0] == total_row + assert result_with_headers[1] == 0 + assert result_with_headers[2] == int(N*n / 2) + assert result_with_headers[3] == int(((N - 1) * N / 2) * n) + + cur.execute('drop table s3_test_csv_with_headers') + cur.execute('drop table s3_test_csv_without_headers') + + cur.close() + conn.close() + + +if __name__ == "__main__": + config = json.loads(os.environ["S3_SOURCE_TEST_CONF"]) + run_id = str(random.randint(1000, 9999)) + N = 10000 + # do_test(config, N, 0, run_id) + items = [",".join([str(j), str(j % 2), str(-1 if j % 2 else 1)]) + for j in range(N) + ] + + data = "\n".join(items) + "\n" + n = 10 + with open("data_without_headers.csv", "w") as f: + for _ in range(10): + f.write(data) + os.fsync(f.fileno()) + + with open("data_with_headers.csv", "w") as f: + f.write("c,b,a\n") + for _ in range(10): + f.write(data) + os.fsync(f.fileno()) + + client = Minio( + config["S3_ENDPOINT"], + access_key=config["S3_ACCESS_KEY"], + secret_key=config["S3_SECRET_KEY"], + secure=True + ) + + try: + client.fput_object( + config["S3_BUCKET"], + f"{run_id}_data_without_headers.csv", + f"data_without_headers.csv" + + ) + client.fput_object( + config["S3_BUCKET"], + f"{run_id}_data_with_headers.csv", + f"data_with_headers.csv" + ) + print( + f"Uploaded {run_id}_data_with_headers.csv & {run_id}_data_with_headers.csv to S3") + os.remove(f"data_with_headers.csv") + os.remove(f"data_without_headers.csv") + except Exception as e: + print(f"Error uploading test files") + + return_code = 0 + try: + do_test(config, N, n, run_id) + except Exception as e: + print("Test failed", e) + return_code = 1 + + # Clean up + for i in range(20): + try: + client.remove_object( + config["S3_BUCKET"], f"{run_id}_data_with_headers.csv") + client.remove_object( + config["S3_BUCKET"], f"{run_id}_data_without_headers.csv") + except Exception as e: + print(f"Error removing testing files {e}") + + exit(return_code) diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 3643f351d555..72f25cfa3e0b 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -26,7 +26,7 @@ bincode = "1" byteorder = "1" bytes = { version = "1", features = ["serde"] } chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } -csv-core = "0.1.10" +csv = "1.2" duration-str = "0.5.0" enum-as-inner = "0.5" futures = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/src/connector/src/macros.rs b/src/connector/src/macros.rs index ee0a4989c4c5..32344714d83f 100644 --- a/src/connector/src/macros.rs +++ b/src/connector/src/macros.rs @@ -163,8 +163,9 @@ macro_rules! impl_connector_properties { macro_rules! impl_common_parser_logic { ($parser_name:ty) => { impl $parser_name { + #[allow(unused_mut)] #[try_stream(boxed, ok = $crate::source::StreamChunkWithState, error = RwError)] - async fn into_chunk_stream(self, data_stream: $crate::source::BoxSourceStream) { + async fn into_chunk_stream(mut self, data_stream: $crate::source::BoxSourceStream) { #[for_await] for batch in data_stream { let batch = batch?; diff --git a/src/connector/src/parser/csv_parser.rs b/src/connector/src/parser/csv_parser.rs index 1f73c7d4508f..52a00cf974ab 100644 --- a/src/connector/src/parser/csv_parser.rs +++ b/src/connector/src/parser/csv_parser.rs @@ -12,48 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::str::FromStr; use anyhow::anyhow; use futures_async_stream::try_stream; -use risingwave_common::error::ErrorCode::InternalError; +use risingwave_common::error::ErrorCode::{InternalError, ProtocolError}; use risingwave_common::error::{Result, RwError}; -use risingwave_common::types::{DataType, Datum, Decimal, ScalarImpl}; +use risingwave_common::types::{Datum, Decimal, ScalarImpl}; use risingwave_expr::vector_op::cast::{ str_to_date, str_to_timestamp, str_with_time_zone_to_timestamptz, }; -use crate::parser::{ - BoxSourceWithStateStream, ByteStreamSourceParser, SourceColumnDesc, SourceStreamChunkBuilder, - SourceStreamChunkRowWriter, StreamChunkWithState, WriteGuard, -}; -use crate::source::{BoxSourceStream, SourceContextRef, SplitId}; - +use crate::impl_common_parser_logic; +use crate::parser::{SourceStreamChunkRowWriter, WriteGuard}; +use crate::source::{DataType, SourceColumnDesc, SourceContextRef}; +impl_common_parser_logic!(CsvParser); macro_rules! to_rust_type { ($v:ident, $t:ty) => { $v.parse::<$t>() .map_err(|_| anyhow!("failed parse {} from {}", stringify!($t), $v))? }; } - #[derive(Debug, Clone)] pub struct CsvParserConfig { pub delimiter: u8, pub has_header: bool, } +/// Parser for JSON format #[derive(Debug)] pub struct CsvParser { rw_columns: Vec, - next_row_is_header: bool, - csv_reader: csv_core::Reader, - // buffers for parse - output: Vec, - output_cursor: usize, - ends: Vec, - ends_cursor: usize, source_ctx: SourceContextRef, + headers: Option>, + delimiter: u8, } impl CsvParser { @@ -69,296 +61,294 @@ impl CsvParser { Ok(Self { rw_columns, - next_row_is_header: has_header, - csv_reader: csv_core::ReaderBuilder::new().delimiter(delimiter).build(), - output: vec![0], - output_cursor: 0, - ends: vec![0], - ends_cursor: 1, + delimiter, + headers: if has_header { Some(Vec::new()) } else { None }, source_ctx, }) } - fn reset_cursor(&mut self) { - self.output_cursor = 0; - self.ends_cursor = 1; + fn read_row(&self, buf: &[u8]) -> Result> { + let mut reader_builder = csv::ReaderBuilder::default(); + reader_builder.delimiter(self.delimiter).has_headers(false); + let record = reader_builder + .from_reader(buf) + .records() + .next() + .transpose() + .map_err(|err| RwError::from(ProtocolError(err.to_string())))?; + Ok(record + .map(|record| record.iter().map(|field| field.to_string()).collect()) + .unwrap_or_default()) } - pub fn parse_columns_to_strings(&mut self, chunk: &mut &[u8]) -> Result>> { - loop { - let (result, n_input, n_output, n_ends) = self.csv_reader.read_record( - chunk, - &mut self.output[self.output_cursor..], - &mut self.ends[self.ends_cursor..], - ); - self.output_cursor += n_output; - *chunk = &(*chunk)[n_input..]; - self.ends_cursor += n_ends; - match result { - // input empty, here means the `chunk` passed to this method - // doesn't contain a whole record, need more bytes - csv_core::ReadRecordResult::InputEmpty => break Ok(None), - // the output buffer is not enough - csv_core::ReadRecordResult::OutputFull => { - let length = self.output.len(); - self.output.resize(length * 2, 0); - } - // the ends buffer is not enough - csv_core::ReadRecordResult::OutputEndsFull => { - let length = self.ends.len(); - self.ends.resize(length * 2, 0); - } - // Success cases - csv_core::ReadRecordResult::Record | csv_core::ReadRecordResult::End => { - // skip the header - if self.next_row_is_header { - self.next_row_is_header = false; - self.reset_cursor(); - continue; - } - let ends_cursor = self.ends_cursor; - // caller provides an empty chunk, and there is no data - // in inner buffer - if ends_cursor <= 1 { - break Ok(None); - } - self.reset_cursor(); - - let string_columns = (1..ends_cursor) - .map(|culomn| { - String::from_utf8( - self.output[self.ends[culomn - 1]..self.ends[culomn]].to_owned(), - ) - .map_err(|e| { - RwError::from(InternalError(format!( - "Parse csv column {} error: invalid UTF-8 ({})", - culomn, e, - ))) - }) - }) - .collect::>>()?; - break Ok(Some(string_columns)); - } + #[inline] + fn parse_string(dtype: &DataType, v: String) -> Result { + let v = match dtype { + // mysql use tinyint to represent boolean + DataType::Boolean => ScalarImpl::Bool(to_rust_type!(v, i16) != 0), + DataType::Int16 => ScalarImpl::Int16(to_rust_type!(v, i16)), + DataType::Int32 => ScalarImpl::Int32(to_rust_type!(v, i32)), + DataType::Int64 => ScalarImpl::Int64(to_rust_type!(v, i64)), + DataType::Float32 => ScalarImpl::Float32(to_rust_type!(v, f32).into()), + DataType::Float64 => ScalarImpl::Float64(to_rust_type!(v, f64).into()), + // FIXME: decimal should have more precision than f64 + DataType::Decimal => Decimal::from_str(v.as_str()) + .map_err(|_| anyhow!("parse decimal from string err {}", v))? + .into(), + DataType::Varchar => v.into(), + DataType::Date => str_to_date(v.as_str())?.into(), + DataType::Time => str_to_date(v.as_str())?.into(), + DataType::Timestamp => str_to_timestamp(v.as_str())?.into(), + DataType::Timestamptz => str_with_time_zone_to_timestamptz(v.as_str())?.into(), + _ => { + return Err(RwError::from(InternalError(format!( + "CSV data source not support type {}", + dtype + )))) } - } + }; + Ok(Some(v)) } - fn try_parse_single_record( + #[allow(clippy::unused_async)] + pub async fn parse_inner( &mut self, - payload: &mut &[u8], + payload: &[u8], mut writer: SourceStreamChunkRowWriter<'_>, - ) -> Result> { - let columns_string = match self.parse_columns_to_strings(payload)? { - None => return Ok(None), - Some(strings) => strings, - }; - writer - .insert(move |desc| { - // column_id is 1-based - let column_id = desc.column_id.get_id() - 1; - let column_type = &desc.data_type; - let v = match columns_string.get(column_id as usize) { - Some(v) => v.to_owned(), - None => return Ok(None), - }; - parse_string(column_type, v) - }) - .map(Some) - } - - #[try_stream(boxed, ok = StreamChunkWithState, error = RwError)] - async fn into_stream(mut self, data_stream: BoxSourceStream) { - // the remain length of the last seen message - let mut remain_len = 0; - // current offset - let mut offset = 0; - // split id of current data stream - let mut split_id = None; - #[for_await] - for batch in data_stream { - let batch = batch?; - - let mut builder = - SourceStreamChunkBuilder::with_capacity(self.rw_columns.clone(), batch.len() * 2); - let mut split_offset_mapping: HashMap = HashMap::new(); - - for msg in batch { - if let Some(content) = msg.payload { - if split_id.is_none() { - split_id = Some(msg.split_id.clone()); - } - - offset = msg.offset.parse().unwrap(); - let mut buff = content.as_ref(); - - remain_len = buff.len(); - loop { - match self.try_parse_single_record(&mut buff, builder.row_writer()) { - Err(e) => { - tracing::warn!( - "message parsing failed {}, skipping", - e.to_string() - ); - continue; - } - Ok(None) => { - break; - } - Ok(Some(_)) => { - let consumed = remain_len - buff.len(); - offset += consumed; - remain_len = buff.len(); - } - } - } - split_offset_mapping.insert(msg.split_id, offset.to_string()); - } + ) -> Result { + let mut fields = self.read_row(payload)?; + if let Some(headers) = &mut self.headers { + if headers.is_empty() { + *headers = fields; + // Here we want a row, but got nothing. So it's an error for the `parse_inner` but + // has no bad impact on the system. + return Err(RwError::from(ProtocolError("This message indicates a header, no row will be inserted. However, internal parser state was updated.".to_string()))); } - yield StreamChunkWithState { - chunk: builder.finish(), - split_offset_mapping: Some(split_offset_mapping), - }; - } - // The file may be missing the last terminator, - // so we need to pass an empty payload to inform the parser. - if remain_len > 0 { - let mut builder = SourceStreamChunkBuilder::with_capacity(self.rw_columns.clone(), 1); - let mut split_offset_mapping: HashMap = HashMap::new(); - let empty = vec![]; - match self.try_parse_single_record(&mut empty.as_ref(), builder.row_writer()) { - Err(e) => { - tracing::warn!("message parsing failed {}, skipping", e.to_string()); + writer.insert(|desc| { + if let Some(i) = headers.iter().position(|name| name == &desc.name) { + Self::parse_string( + &desc.data_type, + fields.get_mut(i).map(std::mem::take).unwrap_or_default(), + ) + } else { + Ok(None) } - Ok(Some(_)) => { - split_offset_mapping - .insert(split_id.unwrap(), (offset + remain_len).to_string()); - yield StreamChunkWithState { - chunk: builder.finish(), - split_offset_mapping: Some(split_offset_mapping), - }; + }) + } else { + fields.reverse(); + writer.insert(|desc| { + if let Some(value) = fields.pop() { + Self::parse_string(&desc.data_type, value) + } else { + Ok(None) } - _ => {} - } + }) } } } -impl ByteStreamSourceParser for CsvParser { - fn into_stream(self, msg_stream: BoxSourceStream) -> BoxSourceWithStateStream { - self.into_stream(msg_stream) - } -} - -#[inline] -fn parse_string(dtype: &DataType, v: String) -> Result { - let v = match dtype { - // mysql use tinyint to represent boolean - DataType::Boolean => ScalarImpl::Bool(to_rust_type!(v, i16) != 0), - DataType::Int16 => ScalarImpl::Int16(to_rust_type!(v, i16)), - DataType::Int32 => ScalarImpl::Int32(to_rust_type!(v, i32)), - DataType::Int64 => ScalarImpl::Int64(to_rust_type!(v, i64)), - DataType::Float32 => ScalarImpl::Float32(to_rust_type!(v, f32).into()), - DataType::Float64 => ScalarImpl::Float64(to_rust_type!(v, f64).into()), - // FIXME: decimal should have more precision than f64 - DataType::Decimal => Decimal::from_str(v.as_str()) - .map_err(|_| anyhow!("parse decimal from string err {}", v))? - .into(), - DataType::Varchar => v.into(), - DataType::Date => str_to_date(v.as_str())?.into(), - DataType::Time => str_to_date(v.as_str())?.into(), - DataType::Timestamp => str_to_timestamp(v.as_str())?.into(), - DataType::Timestamptz => str_with_time_zone_to_timestamptz(v.as_str())?.into(), - _ => { - return Err(RwError::from(InternalError(format!( - "CSV data source not support type {}", - dtype - )))) - } - }; - Ok(Some(v)) -} - #[cfg(test)] mod tests { - use std::sync::Arc; - - use futures_async_stream::for_await; - - use crate::source::{SourceMessage, SourceMeta}; - - #[try_stream(boxed, ok = Vec, error = anyhow::Error)] - async fn prepare_data(data: Vec) { - let mid = data.len() / 2; - let part1 = data[..mid].to_vec(); - let part2 = data[mid..].to_vec(); - let id = "split1".into(); - let part1_len = part1.len(); - let msg1 = SourceMessage { - payload: Some(part1.into()), - offset: 0.to_string(), - split_id: Arc::clone(&id), - meta: SourceMeta::Empty, - }; - let msg2 = SourceMessage { - payload: Some(part2.into()), - offset: part1_len.to_string(), - split_id: Arc::clone(&id), - meta: SourceMeta::Empty, - }; - - yield vec![msg1, msg2]; - } + use risingwave_common::array::Op; + use risingwave_common::row::Row; + use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum}; use super::*; - #[ignore] + use crate::parser::SourceStreamChunkBuilder; #[tokio::test] - async fn test_csv_parser_without_last_line_break() { + async fn test_csv_without_headers() { + let data = [ + r#"1,a,2"#, + r#""15541","a,1,1,",4"#, + r#"0,"""0",0"#, + r#"0,0,0,0,0,0,0,0,0,0,0,0,0,"#, + ]; let descs = vec![ - SourceColumnDesc::simple("name", DataType::Varchar, 1.into()), - SourceColumnDesc::simple("age", DataType::Int32, 2.into()), + SourceColumnDesc::simple("a", DataType::Int32, 0.into()), + SourceColumnDesc::simple("b", DataType::Varchar, 1.into()), + SourceColumnDesc::simple("c", DataType::Int32, 2.into()), ]; + let mut parser = CsvParser::new( + Vec::new(), + CsvParserConfig { + delimiter: b',', + has_header: false, + }, + Default::default(), + ) + .unwrap(); + let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); + for item in data { + parser + .parse_inner(item.as_bytes(), builder.row_writer()) + .await + .unwrap(); + } + let chunk = builder.finish(); + let mut rows = chunk.rows(); + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(1))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("a".into()))) + ); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(2))) + ); + } + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(15541))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("a,1,1,".into()))) + ); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(4))) + ); + } - let config = CsvParserConfig { - delimiter: b',', - has_header: true, - }; - let parser = CsvParser::new(descs, config, Default::default()).unwrap(); - let data = b" -name,age -pite,20 -alex,10"; - let data_stream = prepare_data(data.to_vec()); - let msg_stream = parser.into_stream(data_stream); - #[for_await] - for msg in msg_stream { - println!("{:?}", msg); + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("\"0".into()))) + ); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); } - } - #[ignore] + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("0".into()))) + ); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + } + } #[tokio::test] - async fn test_csv_parser_with_last_line_break() { + async fn test_csv_with_headers() { + let data = [ + r#"c,b,a"#, + r#"1,a,2"#, + r#""15541","a,1,1,",4"#, + r#"0,"""0",0"#, + r#"0,0,0,0,0,0,0,0,0,0,0,0,0,"#, + ]; let descs = vec![ - SourceColumnDesc::simple("name", DataType::Varchar, 1.into()), - SourceColumnDesc::simple("age", DataType::Int32, 2.into()), + SourceColumnDesc::simple("a", DataType::Int32, 0.into()), + SourceColumnDesc::simple("b", DataType::Varchar, 1.into()), + SourceColumnDesc::simple("c", DataType::Int32, 2.into()), ]; + let mut parser = CsvParser::new( + Vec::new(), + CsvParserConfig { + delimiter: b',', + has_header: true, + }, + Default::default(), + ) + .unwrap(); + let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 4); + for item in data { + let _ = parser + .parse_inner(item.as_bytes(), builder.row_writer()) + .await; + } + let chunk = builder.finish(); + let mut rows = chunk.rows(); + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(1))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("a".into()))) + ); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(2))) + ); + } + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(15541))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("a,1,1,".into()))) + ); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(4))) + ); + } - let config = CsvParserConfig { - delimiter: b',', - has_header: true, - }; - let parser = CsvParser::new(descs, config, Default::default()).unwrap(); - let data = b" -name,age -pite,20 -alex,10 -"; - println!("data len: {}", data.len()); - let data_stream = prepare_data(data.to_vec()); - let msg_stream = parser.into_stream(data_stream); - #[for_await] - for msg in msg_stream { - println!("{:?}", msg); + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("\"0".into()))) + ); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + } + + { + let (op, row) = rows.next().unwrap(); + assert_eq!(op, Op::Insert); + assert_eq!( + row.datum_at(2).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); + assert_eq!( + row.datum_at(1).to_owned_datum(), + (Some(ScalarImpl::Utf8("0".into()))) + ); + assert_eq!( + row.datum_at(0).to_owned_datum(), + (Some(ScalarImpl::Int32(0))) + ); } } } diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index fafcad7b8f4c..b995d511e927 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -33,7 +33,6 @@ pub use self::csv_parser::CsvParserConfig; use crate::parser::maxwell::MaxwellParser; use crate::source::{ BoxSourceStream, BoxSourceWithStateStream, SourceColumnDesc, SourceContextRef, SourceFormat, - StreamChunkWithState, }; mod avro; diff --git a/src/connector/src/source/filesystem/s3/source/reader.rs b/src/connector/src/source/filesystem/s3/source/reader.rs index f4ef57efdb90..26694ed92990 100644 --- a/src/connector/src/source/filesystem/s3/source/reader.rs +++ b/src/connector/src/source/filesystem/s3/source/reader.rs @@ -198,7 +198,10 @@ impl S3FileReader { let parser = ByteStreamSourceParserImpl::create(self.parser_config.clone(), source_ctx)?; - let msg_stream = if matches!(parser, ByteStreamSourceParserImpl::Json(_)) { + let msg_stream = if matches!( + parser, + ByteStreamSourceParserImpl::Json(_) | ByteStreamSourceParserImpl::Csv(_) + ) { NdByteStreamWrapper::new(parser).into_stream(Box::pin(data_stream)) } else { parser.into_stream(Box::pin(data_stream))