Skip to content

Commit

Permalink
Merge pull request #157 from blockchain-etl/upgrade-composer-version
Browse files Browse the repository at this point in the history
Upgrade composer version
  • Loading branch information
charlielewisme committed May 15, 2023
2 parents 11d33e2 + c222580 commit 28c2e53
Show file tree
Hide file tree
Showing 28 changed files with 89 additions and 86 deletions.
6 changes: 3 additions & 3 deletions airflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Airflow DAGs for exporting and loading the Polygon blockchain data to Google Big
gcloud composer environments create \
${ENVIRONMENT_NAME} \
--location=us-central1 \
--image-version=composer-2.0.28-airflow-2.2.5 \
--image-version=composer-2.1.14-airflow-2.5.1 \
--environment-size=small \
--scheduler-cpu=2 \
--scheduler-memory=4 \
Expand Down Expand Up @@ -96,13 +96,13 @@ Note that the variable names must be prefixed with `{chain}_`, e.g. `polygon_out
| `output_bucket` | GCS bucket where exported files with blockchain data will be stored |
| `export_start_date` | export start date, default: `2019-04-22` |
| `export_end_date` | export end date, used for integration testing, default: None |
| `export_schedule_interval` | export cron schedule, default: `0 1 * * *` |
| `export_schedule` | export cron schedule, default: `0 1 * * *` |
| `provider_uris` | comma-separated list of provider URIs for [polygon-etl](https://polygon-etl.readthedocs.io/en/latest/commands) command |
| `notification_emails` | comma-separated list of emails where notifications on DAG failures, retries and successes will be delivered. This variable must not be prefixed with `{chain}_` |
| `export_max_active_runs` | max active DAG runs for export, default: `3` |
| `export_max_workers` | max workers for [polygon-etl](https://polygon-etl.readthedocs.io/en/latest/commands) command, default: `5` |
| `destination_dataset_project_id` | GCS project id where destination BigQuery dataset is |
| `load_schedule_interval` | load cron schedule, default: `0 2 * * *` |
| `load_schedule` | load cron schedule, default: `0 2 * * *` |
| `load_end_date` | load end date, used for integration testing, default: None |

### Creating a Cloud Source Repository for Configuration Files
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/polygon_export_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
dag_id='polygon_export_dag',
**read_export_dag_vars(
var_prefix='polygon_',
export_schedule_interval='0 2 * * *',
export_schedule='0 2 * * *',
export_start_date='2020-05-30',
export_max_active_runs=3,
export_max_active_tasks=12,
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/polygon_load_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
chain='polygon',
**read_load_dag_vars(
var_prefix='polygon_',
load_schedule_interval='0 7 * * *'
load_schedule='0 7 * * *'
)
)
2 changes: 1 addition & 1 deletion airflow/dags/polygon_parse_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

parse_dag_vars = read_parse_dag_vars(
var_prefix=var_prefix,
parse_schedule_interval='30 8 * * *'
parse_schedule='30 8 * * *'
)

for folder in glob(table_definitions_folder):
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/polygon_partition_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@
public_dataset_name = 'crypto_polygon',
**read_partition_dag_vars(
var_prefix="polygon_",
partition_schedule_interval="0 8 * * *",
partition_schedule="0 8 * * *",
),
)
36 changes: 18 additions & 18 deletions airflow/dags/polygonetl_airflow/build_export_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from tempfile import TemporaryDirectory

from airflow import DAG, configuration
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator

from polygonetl.cli import (
Expand All @@ -34,21 +34,21 @@


def build_export_dag(
dag_id,
provider_uris,
provider_uris_archival,
output_bucket,
export_start_date,
export_end_date=None,
notification_emails=None,
export_schedule_interval='0 0 * * *',
export_max_workers=10,
export_traces_max_workers=10,
export_batch_size=200,
export_max_active_runs=None,
export_max_active_tasks=None,
export_retries=5,
**kwargs
dag_id,
provider_uris,
provider_uris_archival,
output_bucket,
export_start_date,
export_end_date=None,
notification_emails=None,
export_schedule='0 0 * * *',
export_max_workers=10,
export_traces_max_workers=10,
export_batch_size=200,
export_max_active_runs=None,
export_max_active_tasks=None,
export_retries=5,
**kwargs
):
default_dag_args = {
"depends_on_past": False,
Expand Down Expand Up @@ -82,7 +82,7 @@ def build_export_dag(

dag = DAG(
dag_id,
schedule_interval=export_schedule_interval,
schedule=export_schedule,
default_args=default_dag_args,
max_active_runs=export_max_active_runs,
max_active_tasks=export_max_active_tasks,
Expand Down Expand Up @@ -345,7 +345,7 @@ def add_export_task(
return None

# Operators
export_complete = DummyOperator(task_id="export_complete", dag=dag)
export_complete = EmptyOperator(task_id="export_complete", dag=dag)

export_blocks_and_transactions_operator = add_export_task(
export_blocks_and_transactions_toggle,
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/polygonetl_airflow/build_load_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def build_load_dag(
load_start_date=datetime(2018, 7, 1),
load_end_date=None,
load_catchup=False,
load_schedule_interval='0 0 * * *',
load_schedule='0 0 * * *',
load_all_partitions=True
):
# The following datasets must be created in BigQuery:
Expand Down Expand Up @@ -93,7 +93,7 @@ def read_file(filepath):
dag = models.DAG(
dag_id,
catchup=load_catchup,
schedule_interval=load_schedule_interval,
schedule=load_schedule,
default_args=default_dag_args)

dags_folder = os.environ.get('DAGS_FOLDER', '/home/airflow/gcs/dags')
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/polygonetl_airflow/build_parse_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def build_parse_dag(
internal_project_id,
notification_emails=None,
parse_start_date=datetime(2020, 5, 30),
parse_schedule_interval='0 0 * * *',
parse_schedule='0 0 * * *',
parse_all_partitions=None,
):

Expand All @@ -57,7 +57,7 @@ def build_parse_dag(
dag = models.DAG(
dag_id,
catchup=False,
schedule_interval=parse_schedule_interval,
schedule=parse_schedule,
default_args=default_dag_args)

def create_parse_task():
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/polygonetl_airflow/build_partition_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def build_partition_dag(
public_dataset_name,
load_dag_id,
partition_start_date=datetime(2015, 7, 30),
partition_schedule_interval='0 0 * * *',
partition_schedule='0 0 * * *',
notification_emails=None,
):

Expand All @@ -44,7 +44,7 @@ def build_partition_dag(
dag = models.DAG(
dag_id,
catchup=False,
schedule_interval=partition_schedule_interval,
schedule=partition_schedule,
default_args=default_dag_args)

def add_partition_tasks(task, sql_template, dependencies=None):
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/polygonetl_airflow/build_verify_streaming_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def build_verify_streaming_dag(
chain='polygon',
notification_emails=None,
start_date=datetime(2018, 7, 1),
schedule_interval='*/10 * * * *',
schedule='*/10 * * * *',
max_lag_in_minutes=15):
dataset_name = 'crypto_{}'.format(chain)

Expand All @@ -46,7 +46,7 @@ def build_verify_streaming_dag(
dag = DAG(
dag_id,
catchup=False,
schedule_interval=schedule_interval,
schedule=schedule,
default_args=default_dag_args)

dags_folder = os.environ.get('DAGS_FOLDER', '/home/airflow/gcs/dags')
Expand Down
10 changes: 5 additions & 5 deletions airflow/dags/polygonetl_airflow/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def read_export_dag_vars(var_prefix, **kwargs):
'output_bucket': read_var('output_bucket', var_prefix, True, **kwargs),
'export_start_date': export_start_date,
'export_end_date': export_end_date,
'export_schedule_interval': read_var('export_schedule_interval', var_prefix, True, **kwargs),
'export_schedule': read_var('export_schedule', var_prefix, True, **kwargs),
'provider_uris': provider_uris,
'provider_uris_archival': provider_uris_archival,
'notification_emails': read_var('notification_emails', None, False, **kwargs),
Expand All @@ -52,7 +52,7 @@ def read_load_dag_vars(var_prefix, **kwargs):
'destination_dataset_project_id': read_var('destination_dataset_project_id', var_prefix, True, **kwargs),
'notification_emails': read_var('notification_emails', None, False, **kwargs),
# 'success_notification_emails': read_var('success_notification_emails', None, False, **kwargs),
'load_schedule_interval': read_var('load_schedule_interval', var_prefix, True, **kwargs),
'load_schedule': read_var('load_schedule', var_prefix, True, **kwargs),
'load_all_partitions': parse_bool(read_var('load_all_partitions', var_prefix, False, **kwargs), default=None),
'load_catchup': parse_bool(read_var('load_catchup', var_prefix, False, **kwargs), default=False),
}
Expand All @@ -79,8 +79,8 @@ def read_partition_dag_vars(var_prefix, **kwargs):
"partitioned_project_id": read_var(
"partitioned_project_id", var_prefix, True, **kwargs
),
"partition_schedule_interval": read_var(
"partition_schedule_interval", var_prefix, False, **kwargs
"partition_schedule": read_var(
"partition_schedule", var_prefix, False, **kwargs
),
"notification_emails": read_var("notification_emails", None, False, **kwargs),
}
Expand All @@ -100,7 +100,7 @@ def read_parse_dag_vars(var_prefix, **kwargs):
# internal_project_id takes its value from partitioned_project_id
'internal_project_id': read_var('partitioned_project_id', var_prefix, True, **kwargs),
'parse_destination_dataset_project_id': read_var('parse_destination_dataset_project_id', var_prefix, True, **kwargs),
'parse_schedule_interval': read_var('parse_schedule_interval', var_prefix, True, **kwargs),
'parse_schedule': read_var('parse_schedule', var_prefix, True, **kwargs),
'parse_all_partitions': parse_bool(read_var('parse_all_partitions', var_prefix, False), default=None),
'notification_emails': read_var('notification_emails', None, False, **kwargs),
}
Expand Down
8 changes: 4 additions & 4 deletions airflow/requirements_airflow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# During local dev & testing, you can `pip install -e ../cli` first
# pip will then prioritise the local polygon-etl package over pypi

discord-webhook==0.14.0
eth-rlp==0.2.1 # Fixes install conflicts issue in Composer
eth-utils==1.8.4 # Fixes install conflicts issue in Composer
polygon-etl==0.3.5
discord-webhook==1.1.0
eth-hash==0.3.3 # Fixes install conflicts issue in Composer
polygon-etl==0.3.6
web3==5.31.0 # Fixes install conflicts issue in Composer
11 changes: 6 additions & 5 deletions airflow/requirements_local.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
apache-airflow[google]==2.2.5 # similar to `composer-2.0.28-airflow-2.2.5`
Flask==1.1.2 # matches `composer-2.0.28-airflow-2.2.5`
google-api-core==2.8.2 # matches `composer-2.0.28-airflow-2.2.5`
google-cloud-bigquery==2.34.4 # matches `composer-2.0.28-airflow-2.2.5`
google-cloud-storage==1.44.0 # matches `composer-2.0.28-airflow-2.2.5`
apache-airflow[gcp]==2.5.1 # similar to `composer-2.1.14-airflow-2.5.1`
Flask==2.2.2 # matches `composer-2.1.14-airflow-2.5.1`
google-api-core==2.8.1 # matches `composer-2.1.14-airflow-2.5.1`
google-cloud-bigquery==2.34.4 # matches `composer-2.1.14-airflow-2.5.1`
google-cloud-storage==2.7.0 # matches `composer-2.1.14-airflow-2.5.1`
protobuf==3.20.0 # matches `composer-2.1.14-airflow-2.5.1`
2 changes: 1 addition & 1 deletion airflow/test_dags/dummy_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def do_something():

with models.DAG(
"dummy_dag",
schedule_interval=timedelta(days=1),
schedule=timedelta(days=1),
start_date=datetime(2021, 11, 1),
catchup=False,
default_args={'on_failure_callback': handle_dag_failure},
Expand Down
2 changes: 1 addition & 1 deletion cli/polygonetl/executors/batch_work_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from json.decoder import JSONDecodeError

from requests.exceptions import Timeout as RequestsTimeout, HTTPError, TooManyRedirects, ConnectionError as RequestsConnectionError
from web3.utils.threads import Timeout as Web3Timeout
from web3._utils.threads import Timeout as Web3Timeout

from polygonetl.executors.bounded_executor import BoundedExecutor
from polygonetl.executors.fail_safe_executor import FailSafeExecutor
Expand Down
2 changes: 1 addition & 1 deletion cli/polygonetl/jobs/export_token_transfers_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _export_batch(self, block_number_batch):
if token_transfer is not None:
self.item_exporter.export_item(self.token_transfer_mapper.token_transfer_to_dict(token_transfer))

self.web3.eth.uninstallFilter(event_filter.filter_id)
self.web3.eth.uninstall_filter(event_filter.filter_id)

def _end(self):
self.batch_work_executor.shutdown()
Expand Down
2 changes: 1 addition & 1 deletion cli/polygonetl/jobs/export_traces_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def _export_batch(self, block_number_batch):

# TODO: Change to traceFilter when this issue is fixed
# https://github.com/paritytech/parity-ethereum/issues/9822
json_traces = self.web3.parity.traceBlock(block_number)
json_traces = self.web3.parity.trace_block(block_number)

if json_traces is None:
raise ValueError(
Expand Down
4 changes: 1 addition & 3 deletions cli/polygonetl/providers/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import json
import socket

from web3._utils.threads import Timeout
from web3.providers.ipc import IPCProvider
from web3.utils.threads import (
Timeout,
)

try:
from json import JSONDecodeError
Expand Down
5 changes: 1 addition & 4 deletions cli/polygonetl/providers/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import lru
import requests
from requests.adapters import HTTPAdapter

from web3.utils.caching import (
generate_cache_key,
)
from web3._utils.caching import generate_cache_key


def _remove_session(key, session):
Expand Down
5 changes: 3 additions & 2 deletions cli/polygonetl/service/eth_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
from polygonetl.service.graph_operations import GraphOperations, OutOfBoundsError, Point
from web3.middleware import geth_poa_middleware


class EthService(object):
def __init__(self, web3):
web3.middleware_stack.inject(geth_poa_middleware, layer=0)
web3.middleware_onion.inject(geth_poa_middleware, layer=0)
graph = BlockTimestampGraph(web3)
self._graph_operations = GraphOperations(graph)

Expand Down Expand Up @@ -72,7 +73,7 @@ def __init__(self, web3):

def get_first_point(self):
# Ignore the genesis block as its timestamp is 0
return block_to_point(self._web3.eth.getBlock(1))
return block_to_point(self._web3.eth.get_block(1))

def get_last_point(self):
return block_to_point(self._web3.eth.getBlock('latest'))
Expand Down
4 changes: 2 additions & 2 deletions cli/polygonetl/service/eth_token_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
# SOFTWARE.
import logging

from web3.exceptions import BadFunctionCallOutput
from web3.exceptions import BadFunctionCallOutput, ContractLogicError

from polygonetl.domain.token import EthToken
from polygonetl.erc20_abi import ERC20_ABI
Expand Down Expand Up @@ -65,7 +65,7 @@ def _call_contract_function(self, func):
# OverflowError exception happens if the return type of the function doesn't match the expected type
result = call_contract_function(
func=func,
ignore_errors=(BadFunctionCallOutput, OverflowError, ValueError),
ignore_errors=(BadFunctionCallOutput, ContractLogicError, OverflowError, ValueError),
default_value=None)

if self._function_call_result_transformer is not None:
Expand Down
4 changes: 2 additions & 2 deletions cli/polygonetl/streaming/eth_streamer_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ def __init__(
self.item_id_calculator = EthItemIdCalculator()
self.item_timestamp_calculator = EthItemTimestampCalculator()
self.web3 = Web3(self.batch_web3_provider)
self.web3.middleware_stack.inject(geth_poa_middleware, layer=0)
self.web3.middleware_onion.inject(geth_poa_middleware, layer=0)

def open(self):
self.item_exporter.open()

def get_current_block_number(self):
return int(self.web3.eth.getBlock("latest").number)
return int(self.web3.eth.get_block("latest").number)

def export_all(self, start_block, end_block):
# Export blocks and transactions
Expand Down
2 changes: 1 addition & 1 deletion cli/polygonetl/web3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@

def build_web3(provider):
w3 = Web3(provider)
w3.middleware_stack.inject(geth_poa_middleware, layer=0)
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
return w3
Loading

0 comments on commit 28c2e53

Please sign in to comment.