Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement DynamoDBRead class. #458

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cliboa/scenario/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
from .extract.aws import S3Delete, S3Download, S3DownloadFileDelete, S3FileExistsCheck
from .extract.aws import S3Delete, S3Download, S3DownloadFileDelete, S3FileExistsCheck, DynamoDBRead
from .extract.azure import AzureBlobDownload
from .extract.ftp import FtpDownload, FtpDownloadFileDelete
from .extract.gcp import (
Expand Down
132 changes: 131 additions & 1 deletion cliboa/scenario/extract/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
import csv
import json
import os
import re
from decimal import Decimal

import boto3

from cliboa.adapter.aws import S3Adapter
from cliboa.scenario.aws import BaseS3
from cliboa.scenario.aws import BaseAws, BaseS3
from cliboa.scenario.validator import EssentialParameters
from cliboa.util.cache import ObjectStore
from cliboa.util.constant import StepStatus
from cliboa.util.exception import InvalidParameter


class S3Download(BaseS3):
Expand Down Expand Up @@ -189,3 +195,127 @@ def execute(self, *args):
# The file does not exist
self._logger.info("File not found in S3. After process will not be processed")
return StepStatus.SUCCESSFUL_TERMINATION


class DynamoDBRead(BaseAws):
"""
Download data from DynamoDB and save as a CSV or JSONL file
"""

def __init__(self):
super().__init__()
self._table_name = None
self._dest_dir = "."
self._file_name = None
self._file_format = "csv"

def table_name(self, table_name):
self._table_name = table_name

def dest_dir(self, dest_dir):
self._dest_dir = dest_dir

def file_name(self, file_name):
self._file_name = file_name

def file_format(self, file_format):
if file_format not in ["csv", "jsonl"]:
raise InvalidParameter("file_format must be either 'csv' or 'jsonl'")
self._file_format = file_format

def execute(self, *args):
"""
DynamoDBからデータをダウンロードし、指定されたフォーマットでファイルに保存します。
"""
super().execute()

valid = EssentialParameters(self.__class__.__name__, [self._table_name, self._file_name])
valid()

os.makedirs(self._dest_dir, exist_ok=True)

dynamodb = boto3.resource(
"dynamodb",
aws_access_key_id=self._access_key,
aws_secret_access_key=self._secret_key,
region_name=self._region,
)
table = dynamodb.Table(self._table_name)

file_path = os.path.join(self._dest_dir, self._file_name)
if self._file_format == "jsonl":
self._write_jsonl(self._scan_table(table), file_path)
else: # csv
self._write_csv(self._scan_table(table), file_path)

self._logger.info(f"Downloaded items from DynamoDB table {self._table_name} to {file_path}")

def _scan_table(self, table):
"""
DynamoDBテーブルをスキャンし、全アイテムを取得するジェネレータ関数。

Args:
table (boto3.resources.factory.dynamodb.Table): スキャン対象のDynamoDBテーブル

Yields:
dict: テーブルの各アイテム
"""
last_evaluated_key = None
while True:
if last_evaluated_key:
response = table.scan(ExclusiveStartKey=last_evaluated_key)
else:
response = table.scan()

for item in response["Items"]:
yield item

last_evaluated_key = response.get("LastEvaluatedKey")
if not last_evaluated_key:
break

def _write_jsonl(self, items, file_path):
"""
アイテムをJSONL形式でファイルに書き込みます。
Args:
items (iterator): 書き込むアイテムのイテレータ
file_path (str): 書き込み先のファイルパス
"""
with open(file_path, "w") as f:
for item in items:
json_item = json.dumps(
item, default=self._json_serial, sort_keys=False, ensure_ascii=False
)
f.write(json_item + "\n")

def _json_serial(self, obj):
"""
JSONシリアライズ関数
"""
if isinstance(obj, Decimal):
return int(obj) if obj % 1 == 0 else float(obj)
return str(obj)

def _write_csv(self, items, file_path):
"""
アイテムをCSV形式でファイルに書き込みます。

Args:
items (iterator): 書き込むアイテムのイテレータ
file_path (str): 書き込み先のファイルパス
"""
with open(file_path, "w", newline="") as f:
writer = None
for item in items:
if writer is None:
writer = csv.DictWriter(f, fieldnames=list(item.keys()))
writer.writeheader()

for key, value in item.items():
if isinstance(value, (dict, list)):
# ネストされた属性値はJSON形式に変換
item[key] = json.dumps(
value, default=self._json_serial, sort_keys=False, ensure_ascii=False
)

writer.writerow(item)
172 changes: 169 additions & 3 deletions cliboa/test/scenario/extract/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
import csv
import json
import os
import tempfile

from mock import patch
from decimal import Decimal
from unittest.mock import patch

from cliboa.adapter.aws import S3Adapter
from cliboa.scenario.extract.aws import S3Delete, S3Download, S3FileExistsCheck
from cliboa.scenario.extract.aws import DynamoDBRead, S3Delete, S3Download, S3FileExistsCheck
from cliboa.test import BaseCliboaTest
from cliboa.util.helper import Helper
from cliboa.util.lisboa_log import LisboaLog
Expand Down Expand Up @@ -84,3 +87,166 @@ def test_execute_file_not_exists(self, m_get_client):
instance.execute()
# 処理の正常終了を確認
assert m_get_object.call_args_list == []


class TestDynamoDBRead(BaseCliboaTest):
@patch("boto3.resource")
def test_execute_csv_with_nested_data(self, mock_boto_resource):
test_data = {
"Items": [
{
"id": "1",
"name": "Item 1",
"details": {
"value": Decimal("100"),
"attributes": {"color": "red", "size": "large"},
},
},
{
"id": "2",
"name": "Item 2",
"details": {
"value": Decimal("200"),
"attributes": {"color": "blue", "size": "medium"},
},
},
],
"Count": 2,
"ScannedCount": 2,
"LastEvaluatedKey": None,
}
expected_csv = [
["id", "name", "details"],
["1", "Item 1", '{"value": 100, "attributes": {"color": "red", "size": "large"}}'],
["2", "Item 2", '{"value": 200, "attributes": {"color": "blue", "size": "medium"}}'],
]

self._run_test(mock_boto_resource, test_data, expected_csv, "csv")

@patch("boto3.resource")
def test_execute_csv_without_nested_data(self, mock_boto_resource):
test_data = {
"Items": [
{"id": "1", "name": "Item 1", "value": Decimal("100")},
{"id": "2", "name": "Item 2", "value": Decimal("200")},
],
"Count": 2,
"ScannedCount": 2,
"LastEvaluatedKey": None,
}
expected_csv = [
["id", "name", "value"],
["1", "Item 1", "100"],
["2", "Item 2", "200"],
]

self._run_test(mock_boto_resource, test_data, expected_csv, "csv")

@patch("boto3.resource")
def test_execute_jsonl_with_nested_data(self, mock_boto_resource):
test_data = {
"Items": [
{
"id": "1",
"name": "Item 1",
"details": {
"value": Decimal("100"),
"attributes": {"color": "red", "size": "large"},
},
},
{
"id": "2",
"name": "Item 2",
"details": {
"value": Decimal("200"),
"attributes": {"color": "blue", "size": "medium"},
},
},
],
"Count": 2,
"ScannedCount": 2,
"LastEvaluatedKey": None,
}
expected_jsonl = [
{
"id": "1",
"name": "Item 1",
"details": {"value": 100, "attributes": {"color": "red", "size": "large"}},
},
{
"id": "2",
"name": "Item 2",
"details": {"value": 200, "attributes": {"color": "blue", "size": "medium"}},
},
]

self._run_test(mock_boto_resource, test_data, expected_jsonl, "jsonl")

@patch("boto3.resource")
def test_execute_jsonl_without_nested_data(self, mock_boto_resource):
test_data = {
"Items": [
{"id": "1", "name": "Item 1", "value": Decimal("100")},
{"id": "2", "name": "Item 2", "value": Decimal("200")},
],
"Count": 2,
"ScannedCount": 2,
"LastEvaluatedKey": None,
}
expected_jsonl = [
{"id": "1", "name": "Item 1", "value": 100},
{"id": "2", "name": "Item 2", "value": 200},
]

self._run_test(mock_boto_resource, test_data, expected_jsonl, "jsonl")

def _run_test(self, mock_boto_resource, test_data, expected_data, file_format):
mock_table = mock_boto_resource.return_value.Table.return_value
mock_table.scan.return_value = test_data

with tempfile.TemporaryDirectory() as temp_dir:
instance = DynamoDBRead()
Helper.set_property(instance, "table_name", "test_table")
Helper.set_property(instance, "file_name", f"output.{file_format}")
Helper.set_property(instance, "dest_dir", temp_dir)
Helper.set_property(instance, "file_format", file_format)
Helper.set_property(instance, "logger", LisboaLog.get_logger(__name__))
Helper.set_property(instance, "region", "us-east-1")
instance.execute()

output_file_path = os.path.join(temp_dir, instance._file_name)
assert os.path.exists(output_file_path)

if file_format == "csv":
self._verify_csv(output_file_path, expected_data)
else: # jsonl
self._verify_jsonl(output_file_path, expected_data)

def _verify_csv(self, file_path, expected_data):
with open(file_path, "r", newline="") as csvfile:
reader = csv.reader(csvfile)
actual_data = list(reader)

assert len(actual_data) == len(
expected_data
), f"期待される行数 {len(expected_data)} に対し、実際の行数は {len(actual_data)} です"

for expected_row, actual_row in zip(expected_data, actual_data):
assert len(expected_row) == len(
actual_row
), f"列数が一致しません。期待値: {len(expected_row)}, 実際の値: {len(actual_row)}"
for expected_value, actual_value in zip(expected_row, actual_row):
assert str(actual_value) == str(
expected_value
), f"値が一致しません。期待値: {expected_value}, 実際の値: {actual_value}"

def _verify_jsonl(self, file_path, expected_data):
with open(file_path, "r") as jsonl_file:
actual_data = [json.loads(line) for line in jsonl_file]

assert len(actual_data) == len(
expected_data
), f"Expected {len(expected_data)} items, got {len(actual_data)}"

for expected, actual in zip(expected_data, actual_data):
assert expected == actual, f"Data mismatch: expected {expected}, got {actual}"
33 changes: 33 additions & 0 deletions docs/modules/dynamodb_read.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# DynamoDBRead
Reads data from a DynamoDB table and saves it as a CSV or JSONL file.

# Parameters
|Parameter|Description|Required|Default|Remarks|
|---------|-----------|--------|-------|-------|
|table_name|DynamoDB table name|Yes|None||
|dest_dir|Output directory|No|"." (current directory)|If a non-existent directory path is specified, it will be automatically created.|
|file_name|Output file name|Yes|None||
|file_format|Output file format|No|"csv"|Can be either "csv" or "jsonl".|
|region|AWS region|No|None|If not specified, the default region will be used.|
|access_key|AWS access key|No|None|If not specified, environment variables or IAM role will be used.|
|secret_key|AWS secret key|No|None|If not specified, environment variables or IAM role will be used.|
|profile|AWS profile|No|None|Section name of ~/.aws/config|

# Example
```yaml
scenario:
step:
class: DynamoDBRead
arguments:
table_name: your_dynamodb_table
dest_dir: /path/to/destination
file_name: dynamodb_data.csv
file_format: csv
region: us-west-2
```


# Notes
- Conversion to CSV might be complex for certain DynamoDB attribute types (sets, lists, maps, etc.).
- If the output file already exists, it will be overwritten.
- Partition and sort keys are not guaranteed to line up before other attributes.