Skip to content

Commit

Permalink
Merge pull request #50 from zellular-xyz/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
mchitgarha78 committed Aug 30, 2024
2 parents fe4f395 + e44bfbb commit 7d620df
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 50 deletions.
115 changes: 72 additions & 43 deletions common/db.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import gzip
import json
import math
import os
import threading
import time
Expand Down Expand Up @@ -55,6 +56,11 @@ def fetch_apps(self) -> None:
}
zconfig.APPS.update(data)
self.apps.update(new_apps)
for app_name in zconfig.APPS:
snapshot_path: str = os.path.join(
zconfig.SNAPSHOT_PATH, zconfig.VERSION, app_name
)
os.makedirs(snapshot_path, exist_ok=True)

def fetch_nodes_and_apps(self) -> None:
"""Periodically fetches apps and nodes data."""
Expand Down Expand Up @@ -102,32 +108,26 @@ def load_keys() -> dict[str, Any]:
@staticmethod
def load_finalized_batches(app_name: str, index: int | None = None) -> dict[str, Any]:
"""Load finalized batches for a given app from the snapshot file."""
if index == 0:
return {}

snapshot_dir: str = os.path.join(
zconfig.SNAPSHOT_PATH, zconfig.VERSION, app_name
)
if index is None:
snapshots: list[str] = []
for file in os.listdir(zconfig.SNAPSHOT_PATH):
if file.endswith(".json.gz") and file != 'keys.json.gz':
try:
file_app_name = '_'.join(file.split('_')[1:]).rsplit('.', 2)[0]
if file_app_name == app_name:
snapshots.append(file)
except IndexError:
continue
if not snapshots:
return {}

index = max(
(int(x.split("_")[0]) for x in snapshots if x.split("_")[0].isdigit()),
default=0,
index = 0
snapshots = sorted(
file for file in os.listdir(snapshot_dir)
if file.endswith(".json.gz")
)
if snapshots:
index = int(snapshots[-1].split('.')[0])
else:
index = math.ceil(index / zconfig.SNAPSHOT_CHUNK) * zconfig.SNAPSHOT_CHUNK

if index <= 0:
return {}

snapshot_path: str = os.path.join(
zconfig.SNAPSHOT_PATH, f"{index}_{app_name}.json.gz"
)
try:
with gzip.open(snapshot_path, "rt", encoding="UTF-8") as file:
with gzip.open(snapshot_dir + f"/{str(index).zfill(7)}.json.gz"
,"rt", encoding="UTF-8") as file:
return json.load(file)
except (OSError, IOError, json.JSONDecodeError, FileNotFoundError) as error:
zlogger.exception(
Expand All @@ -143,13 +143,12 @@ def save_snapshot(self, app_name: str, index: int) -> None:
remove_border: int = max(
index - zconfig.SNAPSHOT_CHUNK * zconfig.REMOVE_CHUNK_BORDER, 0
)

snapshot_path: str = os.path.join(
zconfig.SNAPSHOT_PATH, f"{index}_{app_name}.json.gz"
)
try:
snapshot_dir: str = os.path.join(
zconfig.SNAPSHOT_PATH, zconfig.VERSION, app_name
)
self.save_batches_to_file(
app_name, index, snapshot_border, snapshot_path
app_name, index, snapshot_border, snapshot_dir
)
self.prune_old_batches(app_name, remove_border)
except Exception as error:
Expand All @@ -161,10 +160,11 @@ def save_snapshot(self, app_name: str, index: int) -> None:
)

def save_batches_to_file(
self, app_name: str, index: int, snapshot_border: int, snapshot_path: str
self, app_name: str, index: int, snapshot_border: int, snapshot_dir
) -> None:
"""Helper function to save batches to a snapshot file."""
with gzip.open(snapshot_path, "wt", encoding="UTF-8") as file:
with gzip.open(snapshot_dir + f"/{str(index).zfill(7)}.json.gz",
"wt", encoding="UTF-8") as file:
json.dump(
{
batch["hash"]: batch
Expand All @@ -177,24 +177,44 @@ def save_batches_to_file(

def prune_old_batches(self, app_name: str, remove_border: int) -> None:
"""Helper function to prune old batches from memory."""
batches = self.apps[app_name]["batches"]
for key, batch in list(batches.items()):
if batch["state"] == "finalized" and batch["index"] <= remove_border:
del batches[key]
self.apps[app_name]["batches"] = {
batch["hash"]: batch
for batch in self.apps[app_name]["batches"].values()
if batch["state"] != "finalized" or batch["index"] > remove_border
}


def get_batches(
self, app_name: str, states: set[str], after: float = -1
) -> dict[str, Any]:
"""Get batches filtered by state and optionally by index."""
batches: dict[str, Any] = {}
i = 0
for batch_hash, batch in list(self.apps[app_name]["batches"].items()):
def __process_batches(
self, loaded_batches: dict[str, Any], states: set[str], after: float, batches: dict[str, Any]) -> int:
"""Filter and add batches to the result based on state and index."""
for batch_hash, batch in list(loaded_batches.items()):
if batch["state"] in states and batch.get("index", 0) > after:
batches[batch_hash] = batch
i += 1
if i >= zconfig.API_BATCHES_LIMIT:
break
if len(batches) >= zconfig.API_BATCHES_LIMIT:
return
return

def get_batches(self, app_name: str, states: set[str], after: float = -1) -> dict[str, Any]:
"""Get batches filtered by state and optionally by index."""
batches: dict[str, Any] = {}
last_finalized_index = self.apps[app_name]["last_finalized_batch"].get("index", 0)

if last_finalized_index - after >= zconfig.SNAPSHOT_CHUNK:
loaded_batches = self.load_finalized_batches(app_name, after + 1)
self.__process_batches(loaded_batches, states, after, batches)
if len(batches) >= zconfig.API_BATCHES_LIMIT:
return batches

current_chunk = after // zconfig.SNAPSHOT_CHUNK
next_chunk = (after + 1 + len(batches)) // zconfig.SNAPSHOT_CHUNK
finalized_chunk = last_finalized_index // zconfig.SNAPSHOT_CHUNK

if next_chunk not in [current_chunk, finalized_chunk]:
loaded_batches = self.load_finalized_batches(app_name, after + 1 + len(batches))
self.__process_batches(loaded_batches, states, after, batches)
if len(batches) >= zconfig.API_BATCHES_LIMIT:
return batches
self.__process_batches(self.apps[app_name]["batches"], states, after, batches)
return batches

def get_batch(self, app_name: str, batch_hash: str) -> dict[str, Any]:
Expand Down Expand Up @@ -300,6 +320,8 @@ def update_locked_batches(self, app_name: str, sig_data: dict[str, Any]) -> None
for batch in list(batches.values()):
if batch["state"] == "sequenced" and batch["index"] <= sig_data["index"]:
batch["state"] = "locked"
if not batches.get(sig_data["hash"]):
return
target_batch: dict[str, Any] = batches[sig_data["hash"]]
target_batch["lock_signature"] = sig_data["signature"]
target_batch["nonsigners"] = sig_data["nonsigners"]
Expand All @@ -324,6 +346,8 @@ def update_finalized_batches(self, app_name: str, sig_data: dict[str, Any]) -> N
if batch["index"] % zconfig.SNAPSHOT_CHUNK == 0:
snapshot_indexes.append(batch["index"])

if not batches.get(sig_data["hash"]):
return
target_batch: dict[str, Any] = batches[sig_data["hash"]]
target_batch["finalization_signature"] = sig_data["signature"]
target_batch["nonsigners"] = sig_data["nonsigners"]
Expand Down Expand Up @@ -456,6 +480,11 @@ def resequence_batches(
)
self.apps[app_name]["batches"][filtered_batch["hash"]] = filtered_batch
self.apps[app_name]["last_sequenced_batch"] = filtered_batch

def reset_timestamps(self, app_name: str) -> None:
for batch in list(self.apps[app_name]["batches"].values()):
if batch["state"] != "finalized":
batch["timestamp"] = int(time.time())

def reinitialize_batches(
self, app_name: str, all_nodes_last_finalized_batch: dict[str, Any]
Expand Down
2 changes: 2 additions & 0 deletions common/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class ErrorCodes:
"""Application-specific error codes as strings."""

INVALID_REQUEST: str = "invalid_request"
INVALID_NODE_VERSION: str = "invalid_node_version"
IS_SEQUENCER: str = "is_sequencer"
IS_NOT_SEQUENCER: str = "is_not_sequencer"
INVALID_SEQUENCER: str = "invalid_sequencer"
Expand All @@ -33,6 +34,7 @@ class ErrorMessages:
"""Human-readable error messages corresponding to error codes."""

INVALID_REQUEST: str = "The request is invalid."
INVALID_NODE_VERSION: str = "Invalid node version. Please get the latest version of node."
IS_SEQUENCER: str = "This node is the sequencer."
IS_NOT_SEQUENCER: str = "This node is not the sequencer."
INVALID_SEQUENCER: str = "The sequencer ID is invalid."
Expand Down
8 changes: 7 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def load_environment_variables(self):
load_dotenv(dotenv_path=".env", override=False)
self.validate_env_variables()

self.RELEASE_VERSION = 'v0.0.6'
self.VERSION = 'v0.0.7'
self.HEADERS: dict[str, Any] = {"Content-Type": "application/json"}
self.NODES_FILE: str = os.getenv("ZSEQUENCER_NODES_FILE", "./nodes.json")
self.APPS_FILE: str = os.getenv("ZSEQUENCER_APPS_FILE", "./apps.json")
Expand Down Expand Up @@ -333,6 +333,12 @@ def load_environment_variables(self):

self.APPS: dict[str, dict[str, Any]] = Config.get_file_content(self.APPS_FILE)

for app_name in self.APPS:
snapshot_path: str = os.path.join(
self.SNAPSHOT_PATH, self.VERSION, app_name
)
os.makedirs(snapshot_path, exist_ok=True)

def update_sequencer(self, sequencer_id: str | None) -> None:
"""Update the sequencer configuration."""
if sequencer_id:
Expand Down
2 changes: 1 addition & 1 deletion node/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def get_state() -> Response:
"""Get the state of the node and its apps."""
data: dict[str, Any] = {
"sequencer": zconfig.NODE["id"] == zconfig.SEQUENCER["id"],
"release_version": zconfig.RELEASE_VERSION,
"version": zconfig.VERSION,
"sequencer_id": zconfig.SEQUENCER["id"],
"node_id": zconfig.NODE["id"],
"public_key_g2": zconfig.NODE["public_key_g2"].getStr(10).decode('utf-8'),
Expand Down
10 changes: 7 additions & 3 deletions node/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from common import bls, utils
from common.db import zdb
from common.logger import zlogger
from common.errors import ErrorCodes
from config import zconfig

switch_lock: threading.Lock = threading.Lock()
Expand Down Expand Up @@ -62,6 +63,7 @@ def send_app_batches(app_name: str) -> None:
"locked_hash": last_locked_batch.get("hash", ""),
"locked_chaining_hash": last_locked_batch.get("chaining_hash", ""),
"timestamp": int(time.time()),
"version": zconfig.VERSION
}
)

Expand All @@ -71,6 +73,9 @@ def send_app_batches(app_name: str) -> None:
url=url, data=data, headers=zconfig.HEADERS
).json()
if response["status"] == "error":
if response["error"]["code"] == ErrorCodes.INVALID_NODE_VERSION:
zlogger.warning(response["error"]["message"])
return
zdb.add_missed_batches(app_name=app_name, batches_data=initialized_batches)
return

Expand Down Expand Up @@ -343,11 +348,10 @@ def switch_sequencer(old_sequencer_id: str, new_sequencer_id: str) -> bool:
zdb.reinitialize_db(
app_name, new_sequencer_id, all_nodes_last_finalized_batch
)
if zconfig.NODE['id'] == zconfig.SEQUENCER['id']:
time.sleep(3)
else:
if zconfig.NODE['id'] != zconfig.SEQUENCER['id']:
time.sleep(10)

zdb.reset_timestamps(app_name)
zdb.pause_node.clear()
return True

Expand Down
8 changes: 6 additions & 2 deletions sequencer/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from flask import Blueprint, Response, request
import os
from common import utils
from common.db import zdb
from common.errors import ErrorCodes
from common.db import zdb, zlogger
from common.errors import ErrorCodes, ErrorMessages
from common.response_utils import error_response, success_response
from config import zconfig

Expand Down Expand Up @@ -38,6 +38,10 @@ def put_batches() -> Response:
if error_message:
return error_response(ErrorCodes.INVALID_REQUEST, error_message)

if req_data.get("version", "") != zconfig.VERSION:
zlogger.warning(f'Invalid node version. expected {zconfig.VERSION} got {req_data["version"]}')
return error_response(ErrorCodes.INVALID_NODE_VERSION, ErrorMessages.INVALID_NODE_VERSION)

concat_hash: str = "".join(batch["hash"] for batch in req_data["batches"])
is_eth_sig_verified: bool = utils.is_eth_sig_verified(
signature=req_data["signature"],
Expand Down

0 comments on commit 7d620df

Please sign in to comment.