Skip to content

Commit

Permalink
[ch] Dynamo replicator: add try catch, script for local testing (#5715)
Browse files Browse the repository at this point in the history
* Add try catch to catch errors when inserting
* Add simple script so you can see the insert query (and test in
console). Adds sample json with sample record for workflow_job (only 1
though)

Tested by
* Running new test_lambda_function script to see what the query would
be, changed the database to fortesting, ran in console
* Making new lambda, testing that insert still works for workflow_job,
tested that errors get sent to gen_errors table in
#5714
  • Loading branch information
clee2000 authored Sep 27, 2024
1 parent 6e1576e commit 84f2e16
Show file tree
Hide file tree
Showing 5 changed files with 1,685 additions and 20 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,6 @@ docs/_build/
# log classifier data

aws/lambda/log-classifier/data/dataset/

# Generated file
/aws/lambda/clickhouse-replicator-dynamo/generated_query.sql
1 change: 1 addition & 0 deletions aws/lambda/clickhouse-replicator-dynamo/expected_query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
INSERT INTO default.workflow_job SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT JSONEachRow {"runner_id": 11744598, "dynamoKey": "pytorch/pytorch/30663751078", "head_branch": "use_amazon2_docker", "workflow_name": "pull", "runner_group_name": "Default", "created_at": "2024-09-25T18:36:43Z", "check_run_url": "https://github.com/gitapi/repos/pytorch/pytorch/check-runs/30663751078", "head_sha": "6863c4c6dd6684be24c6be9a27ec129022fa992f", "conclusion": "cancelled", "run_url": "https://github.com/gitapi/repos/pytorch/pytorch/actions/runs/11038643569", "id": 30663751078, "run_id": 11038643569, "runner_name": "i-0b2fd9afbc986e218", "steps": [{"conclusion": "success", "number": 1, "completed_at": "2024-09-25T18:36:52Z", "name": "Set up job", "started_at": "2024-09-25T18:36:47Z", "status": "completed"}, {"conclusion": "success", "number": 2, "completed_at": "2024-09-25T18:36:59Z", "name": "Set up runner", "started_at": "2024-09-25T18:36:53Z", "status": "completed"}, {"conclusion": "success", "number": 3, "completed_at": "2024-09-25T18:37:01Z", "name": "Setup SSH (Click me for login details)", "started_at": "2024-09-25T18:36:59Z", "status": "completed"}, {"conclusion": "success", "number": 4, "completed_at": "2024-09-25T18:40:14Z", "name": "Checkout PyTorch", "started_at": "2024-09-25T18:37:01Z", "status": "completed"}, {"conclusion": "success", "number": 5, "completed_at": "2024-09-25T18:40:17Z", "name": "Setup Linux", "started_at": "2024-09-25T18:40:14Z", "status": "completed"}, {"conclusion": "skipped", "number": 6, "completed_at": "2024-09-25T18:40:17Z", "name": "configure aws credentials", "started_at": "2024-09-25T18:40:17Z", "status": "completed"}, {"conclusion": "success", "number": 7, "completed_at": "2024-09-25T18:40:18Z", "name": "Calculate docker image", "started_at": "2024-09-25T18:40:17Z", "status": "completed"}, {"conclusion": "success", "number": 8, "completed_at": "2024-09-25T18:40:18Z", "name": "Use following to pull public copy of the image", "started_at": "2024-09-25T18:40:18Z", "status": "completed"}, {"conclusion": "success", "number": 9, "completed_at": "2024-09-25T18:42:45Z", "name": "Pull docker image", "started_at": "2024-09-25T18:40:18Z", "status": "completed"}, {"conclusion": "success", "number": 10, "completed_at": "2024-09-25T18:42:45Z", "name": "Check if in a ARC runner", "started_at": "2024-09-25T18:42:45Z", "status": "completed"}, {"conclusion": "skipped", "number": 11, "completed_at": "2024-09-25T18:42:46Z", "name": "Install nvidia driver, nvidia-docker runtime, set GPU_FLAG", "started_at": "2024-09-25T18:42:46Z", "status": "completed"}, {"conclusion": "skipped", "number": 12, "completed_at": "2024-09-25T18:42:46Z", "name": "Lock NVIDIA A100 40GB Frequency", "started_at": "2024-09-25T18:42:46Z", "status": "completed"}, {"conclusion": "success", "number": 13, "completed_at": "2024-09-25T18:42:48Z", "name": "Start monitoring script", "started_at": "2024-09-25T18:42:46Z", "status": "completed"}, {"conclusion": "success", "number": 14, "completed_at": "2024-09-25T18:43:19Z", "name": "Download build artifacts", "started_at": "2024-09-25T18:42:48Z", "status": "completed"}, {"conclusion": "success", "number": 15, "completed_at": "2024-09-25T18:43:20Z", "name": "Download TD artifacts", "started_at": "2024-09-25T18:43:19Z", "status": "completed"}, {"conclusion": "success", "number": 16, "completed_at": "2024-09-25T18:43:20Z", "name": "Parse ref", "started_at": "2024-09-25T18:43:20Z", "status": "completed"}, {"conclusion": "success", "number": 17, "completed_at": "2024-09-25T18:43:21Z", "name": "Get workflow job id", "started_at": "2024-09-25T18:43:20Z", "status": "completed"}, {"conclusion": "success", "number": 18, "completed_at": "2024-09-25T18:43:24Z", "name": "Check for keep-going label and re-enabled test issues", "started_at": "2024-09-25T18:43:21Z", "status": "completed"}, {"conclusion": "success", "number": 19, "completed_at": "2024-09-25T18:43:24Z", "name": "Set Test step time", "started_at": "2024-09-25T18:43:24Z", "status": "completed"}, {"conclusion": "cancelled", "number": 20, "completed_at": "2024-09-25T18:45:18Z", "name": "Test", "started_at": "2024-09-25T18:43:24Z", "status": "completed"}, {"conclusion": "skipped", "number": 21, "completed_at": "2024-09-25T18:45:19Z", "name": "Upload pytest cache if tests failed", "started_at": "2024-09-25T18:45:19Z", "status": "completed"}, {"conclusion": "success", "number": 22, "completed_at": "2024-09-25T18:45:19Z", "name": "Print remaining test logs", "started_at": "2024-09-25T18:45:19Z", "status": "completed"}, {"conclusion": "success", "number": 23, "completed_at": "2024-09-25T18:45:19Z", "name": "Stop monitoring script", "started_at": "2024-09-25T18:45:19Z", "status": "completed"}, {"conclusion": "success", "number": 24, "completed_at": "2024-09-25T18:45:21Z", "name": "Upload test artifacts", "started_at": "2024-09-25T18:45:19Z", "status": "completed"}, {"conclusion": "success", "number": 25, "completed_at": "2024-09-25T18:45:21Z", "name": "Collect backtraces from coredumps (if any)", "started_at": "2024-09-25T18:45:21Z", "status": "completed"}, {"conclusion": "skipped", "number": 26, "completed_at": "2024-09-25T18:45:21Z", "name": "Store Core dumps on S3", "started_at": "2024-09-25T18:45:21Z", "status": "completed"}, {"conclusion": "success", "number": 27, "completed_at": "2024-09-25T18:45:24Z", "name": "Teardown Linux", "started_at": "2024-09-25T18:45:21Z", "status": "completed"}, {"conclusion": "skipped", "number": 28, "completed_at": "2024-09-25T18:45:25Z", "name": "Check NVIDIA driver installation step", "started_at": "2024-09-25T18:45:25Z", "status": "completed"}, {"conclusion": "success", "number": 55, "completed_at": "2024-09-25T18:45:25Z", "name": "Post Checkout PyTorch", "started_at": "2024-09-25T18:45:25Z", "status": "completed"}, {"conclusion": "success", "number": 56, "completed_at": "2024-09-25T18:45:32Z", "name": "Complete runner", "started_at": "2024-09-25T18:45:26Z", "status": "completed"}, {"conclusion": "success", "number": 57, "completed_at": "2024-09-25T18:45:32Z", "name": "Complete job", "started_at": "2024-09-25T18:45:32Z", "status": "completed"}], "url": "https://github.com/gitapi/repos/pytorch/pytorch/actions/jobs/30663751078", "labels": ["linux.2xlarge"], "completed_at": "2024-09-25T18:45:33Z", "html_url": "https://github.com/pytorch/pytorch/actions/runs/11038643569/job/30663751078", "name": "linux-focal-py3.12-clang10-experimental-split-build / test (dynamo, 1, 3, linux.2xlarge)", "run_attempt": 1, "started_at": "2024-09-25T18:36:47Z", "runner_group_id": 1, "torchci_classification": {"captures": ["##[error]The operation was canceled."], "line_num": 14784, "line": "##[error]The operation was canceled.", "context": ["+ python test/run_test.py --dynamo --exclude-inductor-tests --exclude-jit-executor --exclude-distributed-tests --exclude-torch-export-tests --shard 1 3 --verbose", "+ python tools/dynamo/verify_dynamo.py", "+ [[ -z 3 ]]", "+ test_dynamo_shard 1", "+ '[' -n '' ']'", "+ pip install --progress-bar off --no-use-pep517 --user git+https://github.com/pytorch/vision.git@d23a6e1664d20707c11781299611436e1f0c104f", "+ pip_install --no-use-pep517 --user git+https://github.com/pytorch/vision.git@d23a6e1664d20707c11781299611436e1f0c104f", "+ '[' -n '' ']'", "+ orig_preload=", "+ commit=d23a6e1664d20707c11781299611436e1f0c104f", "++ cat .github/ci_commit_pins/vision.txt", "++ get_pinned_commit vision"], "rule": "GHA job was cancelled"}, "node_id": "CR_kwDOA-j9z88AAAAHI7O1pg", "status": "completed"}
66 changes: 46 additions & 20 deletions aws/lambda/clickhouse-replicator-dynamo/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class EventType(Enum):

def lambda_handler(event: Any, context: Any) -> None:
# https://clickhouse.com/docs/en/integrations/python
handle_event(event, False)


def handle_event(event: Any, dry_run: bool) -> None:
counts = defaultdict(int)
docs_to_upsert = defaultdict(list)
for record in event["Records"]:
Expand All @@ -57,19 +61,19 @@ def lambda_handler(event: Any, context: Any) -> None:
event_name == EventType.INSERT.value
or event_name == EventType.MODIFY.value
):
table, doc = get_doc_for_upsert(record)
docs_to_upsert[table].append(doc)
table, id, doc = get_doc_for_upsert(record)
docs_to_upsert[table].append((id, doc))
elif event_name == EventType.REMOVE.value:
remove_document(record)
remove_document(record, dry_run)
else:
warn(f"Unrecognized event type {event_name} in {json.dumps(record)}")

counts[event_name] += 1
except Exception as error:
warn(f"Failed to process {json.dumps(record)}: {error}")

for table, docs in docs_to_upsert.items():
upsert_documents(table, docs)
for table, ids_and_docs in docs_to_upsert.items():
upsert_documents(table, ids_and_docs, dry_run)

print(f"Finish processing {json.dumps(counts)}")

Expand Down Expand Up @@ -155,7 +159,7 @@ def handle_workflow_job(record: Any) -> Any:
return record


def get_doc_for_upsert(record: Any) -> Optional[Tuple[str, Any]]:
def get_doc_for_upsert(record: Any) -> Optional[Tuple[str, str, Any]]:
table = extract_dynamodb_table(record)
if not table:
return
Expand All @@ -170,26 +174,45 @@ def get_doc_for_upsert(record: Any) -> Optional[Tuple[str, Any]]:
print(f"Parsing {id}: {json.dumps(body)}")
if not id:
return
return table, body
return table, id, body


def upsert_documents(table: str, documents: List[Any]) -> None:
def upsert_documents(table: str, documents: List[Tuple[str, Any]], dry_run: bool) -> None:
"""
Insert a new doc or modify an existing document. Note that ClickHouse doesn't really
update the document in place, but rather adding a new record for the update
"""
body = ""
for document in documents:
body += json.dumps(document) + "\n"

print(f"UPSERTING {len(documents)} INTO {table}")
res = get_clickhouse_client().query(
f"INSERT INTO {table} SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT JSONEachRow {body}"
)
print(res)

body = ""
for _, document in documents:
body += json.dumps(document) + "\n"

def remove_document(record: Any) -> None:
query = f"INSERT INTO {table} SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT JSONEachRow {body}"
if dry_run:
print(query)
return
try:
res = get_clickhouse_client().query(query)
print(res)
except Exception as error:
warn(f"Failed to upsert {len(documents)} into {table}: {error}")
id_bodies = ""
for id, _ in documents:
id_doc = {
"table": table,
"bucket": "dynamo",
"key": id,
"reason": str(error),
}
id_bodies += json.dumps(id_doc) + "\n"
get_clickhouse_client().query(
f"INSERT INTO errors.gen_errors SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT JSONEachRow {id_bodies}"
)
raise error


def remove_document(record: Any, dry_run: bool) -> None:
"""
Remove a document. This is here for completeness as we don't remove records like ever
"""
Expand All @@ -204,6 +227,9 @@ def remove_document(record: Any) -> None:
print(f"DELETING {id} FROM {table}")

parameters = {"id": id}
get_clickhouse_client().query(
f"DELETE FROM {table} WHERE dynamoKey = %(id)s", parameters=parameters
)

query = f"DELETE FROM {table} WHERE dynamoKey = %(id)s"
if dry_run:
print(query)
return
get_clickhouse_client().query(query, parameters=parameters)
Loading

0 comments on commit 84f2e16

Please sign in to comment.