diff --git a/cli/polygonetl/cli/stream.py b/cli/polygonetl/cli/stream.py index 189284de..d97288dd 100644 --- a/cli/polygonetl/cli/stream.py +++ b/cli/polygonetl/cli/stream.py @@ -47,11 +47,12 @@ @click.option('--period-seconds', default=10, show_default=True, type=int, help='How many seconds to sleep between syncs') @click.option('-b', '--batch-size', default=10, show_default=True, type=int, help='How many blocks to batch in single request') @click.option('-B', '--block-batch-size', default=1, show_default=True, type=int, help='How many blocks to batch in single sync round') +@click.option('-r', '--ramp-up-blocks', default=0, show_default=True, type=int, help='Specifies the count of initial blocks to be processed one by one before switching to batch processing. This approach is particularly beneficial for mitigating issues like Out-of-Memory (OOM) errors when dealing with large batches') @click.option('-w', '--max-workers', default=5, show_default=True, type=int, help='The number of workers') @click.option('--log-file', default=None, show_default=True, type=str, help='Log file') @click.option('--pid-file', default=None, show_default=True, type=str, help='pid file') def stream(last_synced_block_file, lag, provider_uri, output, start_block, entity_types, - period_seconds=10, batch_size=2, block_batch_size=10, max_workers=5, log_file=None, pid_file=None): + period_seconds=10, batch_size=2, block_batch_size=10, ramp_up_blocks=0, max_workers=5, log_file=None, pid_file=None): """Streams all data types to console or Google Pub/Sub.""" configure_logging(log_file) configure_signals() @@ -80,6 +81,7 @@ def stream(last_synced_block_file, lag, provider_uri, output, start_block, entit start_block=start_block, period_seconds=period_seconds, block_batch_size=block_batch_size, + ramp_up_blocks=ramp_up_blocks, pid_file=pid_file ) streamer.stream() diff --git a/cli/setup.py b/cli/setup.py index 155aaccc..e578945a 100644 --- a/cli/setup.py +++ b/cli/setup.py @@ -32,7 +32,7 @@ def read(fname): python_requires=">=3.7,<3.10", install_requires=[ "base58==2.1.1", - "blockchain-etl-common==1.6.1", + "blockchain-etl-common==1.7.1", "click>=8.0,<9", "eth-abi==2.2.0", # web3 5.28.0 depends on eth-abi<3.0.0 "eth-utils==1.10", # eth-abi 2.2.0 depends on eth-utils<2.0.0 diff --git a/streaming/charts/polygon-etl-streaming/templates/deployment.yaml b/streaming/charts/polygon-etl-streaming/templates/deployment.yaml index 864cbe6d..2219e6f5 100644 --- a/streaming/charts/polygon-etl-streaming/templates/deployment.yaml +++ b/streaming/charts/polygon-etl-streaming/templates/deployment.yaml @@ -86,6 +86,7 @@ spec: - "--max-workers={{ .Values.config.MAX_WORKERS }}" - "--batch-size={{ .Values.config.BATCH_SIZE }}" - "--block-batch-size={{ .Values.config.BLOCK_BATCH_SIZE }}" + - "--ramp-up-blocks={{ .Values.config.BLOCK_BATCH_SIZE }}" - "--pid-file={{ .Values.lsb_path }}/{{ .Values.pid_file }}" resources: {{- toYaml .Values.stream.resources | nindent 12 }}