Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): unified csv parser #8463

Merged
merged 11 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions ci/scripts/s3-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ set -euo pipefail

source ci/scripts/common.env.sh

while getopts 'p:' opt; do
while getopts 'p:s:' opt; do
case ${opt} in
p )
profile=$OPTARG
;;
s )
script=$OPTARG
;;
\? )
echo "Invalid Option: -$OPTARG" 1>&2
exit 1
Expand All @@ -20,6 +23,8 @@ while getopts 'p:' opt; do
done
shift $((OPTIND -1))



echo "--- Download artifacts"
mkdir -p target/debug
buildkite-agent artifact download risingwave-"$profile" target/debug/
Expand All @@ -44,7 +49,7 @@ cargo make ci-start ci-1cn-1fe

echo "--- Run test"
python3 -m pip install minio psycopg2-binary
python3 e2e_test/s3/run.py
python3 e2e_test/s3/$script.py

echo "--- Kill cluster"
cargo make ci-kill
23 changes: 19 additions & 4 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ steps:
timeout_in_minutes: 5
retry: *auto-retry

- label: "S3 source check on AWS"
command: "ci/scripts/s3-source-test.sh -p ci-release"
- label: "S3 source check on AWS (json parser)"
command: "ci/scripts/s3-source-test.sh -p ci-release -s run"
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
Expand All @@ -200,8 +200,8 @@ steps:
timeout_in_minutes: 20
retry: *auto-retry

- label: "S3 source check on lyvecloud.seagate.com"
command: "ci/scripts/s3-source-test.sh -p ci-release"
- label: "S3 source check on lyvecloud.seagate.com (json parser)"
command: "ci/scripts/s3-source-test.sh -p ci-release -s run"
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
Expand All @@ -215,3 +215,18 @@ steps:
- S3_SOURCE_TEST_CONF
timeout_in_minutes: 20
retry: *auto-retry
- label: "S3 source check on AWS (csv parser)"
command: "ci/scripts/s3-source-test.sh -p ci-release -s run_csv"
depends_on: build
plugins:
- seek-oss/aws-sm#v2.3.1:
env:
S3_SOURCE_TEST_CONF: ci_s3_source_test_aws
- docker-compose#v4.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
environment:
- S3_SOURCE_TEST_CONF
timeout_in_minutes: 20
retry: *auto-retry
155 changes: 155 additions & 0 deletions e2e_test/s3/run_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import os
import string
import json
import string
from time import sleep
from minio import Minio
import psycopg2
import random


def do_test(config, N, n, prefix):
conn = psycopg2.connect(
host="localhost",
port="4566",
user="root",
database="dev"
)

# Open a cursor to execute SQL statements
cur = conn.cursor()
cur.execute(f'''CREATE TABLE s3_test_csv_without_headers(
a int,
b int,
c int,
) WITH (
connector = 's3',
match_pattern = '{prefix}_data_without_headers.csv',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
) ROW FORMAT CSV WITHOUT HEADER DELIMITED BY ',';''')

cur.execute(f'''CREATE TABLE s3_test_csv_with_headers(
a int,
b int,
c int,
) WITH (
connector = 's3',
match_pattern = '{prefix}_data_with_headers.csv',
s3.region_name = '{config['S3_REGION']}',
s3.bucket_name = '{config['S3_BUCKET']}',
s3.credentials.access = '{config['S3_ACCESS_KEY']}',
s3.credentials.secret = '{config['S3_SECRET_KEY']}',
s3.endpoint_url = 'https://{config['S3_ENDPOINT']}'
) ROW FORMAT CSV DELIMITED BY ',';''')

total_row = int(N * n)
sleep(60)
while True:
sleep(60)
cur.execute('select count(*) from s3_test_csv_with_headers')
result_with_headers = cur.fetchone()
cur.execute('select count(*) from s3_test_csv_without_headers')
result_without_headers = cur.fetchone()
if result_with_headers[0] == total_row and result_without_headers[0] == total_row:
break
print(
f"Now got {result_with_headers[0]} rows in table, {total_row} expected, wait 60s")

cur.execute(
'select count(*), sum(a), sum(b), sum(c) from s3_test_csv_with_headers')
result_with_headers = cur.fetchone()

cur.execute(
'select count(*), sum(a), sum(b), sum(c) from s3_test_csv_without_headers')
s3_test_csv_without_headers = cur.fetchone()

print(result_with_headers, s3_test_csv_without_headers,
int(((N - 1) * N / 2) * n), int(N*n / 2))

assert s3_test_csv_without_headers[0] == total_row
assert s3_test_csv_without_headers[1] == int(((N - 1) * N / 2) * n)
assert s3_test_csv_without_headers[2] == int(N*n / 2)
assert s3_test_csv_without_headers[3] == 0

assert result_with_headers[0] == total_row
assert result_with_headers[1] == 0
assert result_with_headers[2] == int(N*n / 2)
assert result_with_headers[3] == int(((N - 1) * N / 2) * n)

cur.execute('drop table s3_test_csv_with_headers')
cur.execute('drop table s3_test_csv_without_headers')

cur.close()
conn.close()


if __name__ == "__main__":
config = json.loads(os.environ["S3_SOURCE_TEST_CONF"])
run_id = str(random.randint(1000, 9999))
N = 10000
# do_test(config, N, 0, run_id)
items = [",".join([str(j), str(j % 2), str(-1 if j % 2 else 1)])
for j in range(N)
]

data = "\n".join(items) + "\n"
n = 10
with open("data_without_headers.csv", "w") as f:
for _ in range(10):
f.write(data)
os.fsync(f.fileno())

with open("data_with_headers.csv", "w") as f:
f.write("c,b,a\n")
for _ in range(10):
f.write(data)
os.fsync(f.fileno())

client = Minio(
config["S3_ENDPOINT"],
access_key=config["S3_ACCESS_KEY"],
secret_key=config["S3_SECRET_KEY"],
secure=True
)

try:
client.fput_object(
config["S3_BUCKET"],
f"{run_id}_data_without_headers.csv",
f"data_without_headers.csv"

)
client.fput_object(
config["S3_BUCKET"],
f"{run_id}_data_with_headers.csv",
f"data_with_headers.csv"
)
print(
f"Uploaded {run_id}_data_with_headers.csv & {run_id}_data_with_headers.csv to S3")
os.remove(f"data_with_headers.csv")
os.remove(f"data_without_headers.csv")
except Exception as e:
print(f"Error uploading test files")

return_code = 0
try:
do_test(config, N, n, run_id)
except Exception as e:
print("Test failed", e)
return_code = 1

# Clean up
for i in range(20):
try:
client.remove_object(
config["S3_BUCKET"], f"{run_id}_data_with_headers.csv")
client.remove_object(
config["S3_BUCKET"], f"{run_id}_data_without_headers.csv")
except Exception as e:
print(f"Error removing testing files {e}")

exit(return_code)
2 changes: 1 addition & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ bincode = "1"
byteorder = "1"
bytes = { version = "1", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
csv-core = "0.1.10"
csv = "1.2"
duration-str = "0.5.0"
enum-as-inner = "0.5"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,9 @@ macro_rules! impl_connector_properties {
macro_rules! impl_common_parser_logic {
($parser_name:ty) => {
impl $parser_name {
#[allow(unused_mut)]
#[try_stream(boxed, ok = $crate::source::StreamChunkWithState, error = RwError)]
async fn into_chunk_stream(self, data_stream: $crate::source::BoxSourceStream) {
async fn into_chunk_stream(mut self, data_stream: $crate::source::BoxSourceStream) {
#[for_await]
for batch in data_stream {
let batch = batch?;
Expand Down
Loading