Skip to content

Commit

Permalink
Fix "kraken.spot.OrderbookClient is not able to resubscribe to book…
Browse files Browse the repository at this point in the history
… feeds after connection lost" (#149)
  • Loading branch information
btschwertfeger committed Aug 31, 2023
1 parent 99f6092 commit a8d7e35
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/_codecov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
FUTURES_SECRET_KEY: ${{ secrets.FUTURES_SECRET_KEY }}
FUTURES_SANDBOX_KEY: ${{ secrets.FUTURES_SANDBOX_KEY }}
FUTURES_SANDBOX_SECRET: ${{ secrets.FUTURES_SANDBOX_SECRET }}
run: pytest -vv -s --cov --cov-report=xml:coverage.xml tests
run: pytest -vv --cov --cov-report=xml:coverage.xml tests

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
Expand Down
18 changes: 2 additions & 16 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ repos:
hooks:
- id: python-use-type-annotations
- id: rst-backticks
# - id: rst-inline-touching-normal
- id: rst-directive-colons
- id: text-unicode-replacement-char
- repo: https://github.com/psf/black
Expand All @@ -40,7 +39,7 @@ repos:
hooks:
- id: isort
args:
- --profile=black # solves conflicts between black and isort
- --profile=black
- repo: https://github.com/pycqa/flake8
rev: 6.1.0
hooks:
Expand All @@ -60,7 +59,7 @@ repos:
- --rcfile=pyproject.toml
- -d=R0801 # ignore duplicate code
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.0.285
rev: v0.0.286
hooks:
- id: ruff
args:
Expand All @@ -75,21 +74,8 @@ repos:
pass_filenames: false
args:
- --config-file=pyproject.toml
# - --install-types # do not use within pre-commit
# - --non-interactive # same here
- kraken
- repo: https://github.com/pre-commit/mirrors-prettier
rev: v3.0.2
hooks:
- id: prettier
ci:
autofix_commit_msg: |
[pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
autofix_prs: false
autoupdate_branch: ""
autoupdate_commit_msg: "[pre-commit.ci] pre-commit autoupdate"
autoupdate_schedule: weekly
skip: []
submodules: false
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## [Unreleased](https://github.com/btschwertfeger/python-kraken-sdk/tree/HEAD)

[Full Changelog](https://github.com/btschwertfeger/python-kraken-sdk/compare/v1.6.1...HEAD)

Uncategorized merged pull requests:

- Bump Pre-Commit hook versions and adjust typing [\#146](https://github.com/btschwertfeger/python-kraken-sdk/pull/146) ([btschwertfeger](https://github.com/btschwertfeger))

## [v1.6.1](https://github.com/btschwertfeger/python-kraken-sdk/tree/v1.6.1) (2023-08-07)

[Full Changelog](https://github.com/btschwertfeger/python-kraken-sdk/compare/v1.6.0...v1.6.1)
Expand Down
1 change: 0 additions & 1 deletion examples/futures_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
datefmt="%Y/%m/%d %H:%M:%S",
level=logging.INFO,
)
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

Expand Down
1 change: 0 additions & 1 deletion examples/futures_trading_bot_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
datefmt="%Y/%m/%d %H:%M:%S",
level=logging.INFO,
)
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

Expand Down
1 change: 0 additions & 1 deletion examples/futures_ws_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
datefmt="%Y/%m/%d %H:%M:%S",
level=logging.INFO,
)
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

Expand Down
1 change: 0 additions & 1 deletion examples/spot_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
datefmt="%Y/%m/%d %H:%M:%S",
level=logging.INFO,
)
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

Expand Down
9 changes: 9 additions & 0 deletions examples/spot_orderbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,19 @@
from __future__ import annotations

import asyncio
import logging
from typing import Any, Dict, List, Tuple

from kraken.spot import OrderbookClient

logging.basicConfig(
format="%(asctime)s %(module)s,line: %(lineno)d %(levelname)8s | %(message)s",
datefmt="%Y/%m/%d %H:%M:%S",
level=logging.INFO,
)
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("requests").setLevel(logging.WARNING)


class Orderbook(OrderbookClient):
"""
Expand Down
1 change: 0 additions & 1 deletion examples/spot_trading_bot_template_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
datefmt="%Y/%m/%d %H:%M:%S",
level=logging.INFO,
)
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

Expand Down
1 change: 0 additions & 1 deletion examples/spot_trading_bot_template_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
datefmt="%Y/%m/%d %H:%M:%S",
level=logging.INFO,
)
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

Expand Down
1 change: 0 additions & 1 deletion examples/spot_ws_examples_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
datefmt="%Y/%m/%d %H:%M:%S",
level=logging.INFO,
)
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

Expand Down
3 changes: 1 addition & 2 deletions examples/spot_ws_examples_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
datefmt="%Y/%m/%d %H:%M:%S",
level=logging.INFO,
)
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

Expand Down Expand Up @@ -71,7 +70,7 @@ async def on_message(self: Client, message: dict) -> None:
await client.subscribe(
params={"channel": "book", "depth": 25, "symbol": ["BTC/USD"]},
)
await client.subscribe(params={"channel": "ohlc", "symbol": ["BTC/USD"]})
# await client.subscribe(params={"channel": "ohlc", "symbol": ["BTC/USD"]})
await client.subscribe(
params={
"channel": "ohlc",
Expand Down
11 changes: 5 additions & 6 deletions kraken/spot/orderbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,18 @@ class and used as callback to receive all messages sent by the
*This function must not be overloaded - it would break this client!*
"""

if not isinstance(message, dict):
return

if (
message.get("method") == "unsubscribe"
message.get("method") in ("subscribe", "unsubscribe")
and message.get("result")
and message["result"].get("channel") == "book"
and message.get("success")
and message["result"]["symbol"] in self.__book
):
del self.__book[message["result"]["symbol"]]
self.LOG.debug("Removed book for %s", message["result"]["symbol"])
return

if ( # pylint: disable=too-many-boolean-expressions)
Expand All @@ -162,6 +162,7 @@ class and used as callback to receive all messages sent by the
# ----------------------------------------------------------------------
pair: str = message["data"][0]["symbol"]
if pair not in self.__book:
self.LOG.debug("Add book for %s", pair)
# retrieve the decimal places required for checksum calculation
sym_info: dict = self.__market.get_asset_pairs(pair=pair)

Expand Down Expand Up @@ -291,8 +292,6 @@ def get(self: OrderbookClient, pair: str) -> Optional[dict]:
:return: The orderbook of that ``pair``.
:rtype: dict
todo: fix documentation
.. code-block::python
:linenos:
:caption: Orderbook: Get ask and bid
Expand Down Expand Up @@ -335,11 +334,11 @@ def __update_book(
:type timestamp: str, optional
"""
for order in orders:
volume = "{:.{}f}".format( # pylint: disable=consider-using-f-string # noqa: PLE1300
volume = "{:.{}f}".format( # pylint: disable=consider-using-f-string
order["qty"],
self.__book[symbol]["qty_decimals"],
)
price = "{:.{}f}".format( # pylint: disable=consider-using-f-string # noqa: PLE1300
price = "{:.{}f}".format( # pylint: disable=consider-using-f-string
order["price"],
self.__book[symbol]["price_decimals"],
)
Expand Down
42 changes: 42 additions & 0 deletions kraken/spot/websocket/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ async def __run(self: ConnectSpotWebsocketBase, event: asyncio.Event) -> None:

async with websockets.connect( # pylint: disable=no-member
f"wss://{self.__ws_endpoint}",
extra_headers={"User-Agent": "python-kraken-sdk"},
ping_interval=30,
) as socket:
self.LOG.info("Websocket connected!")
Expand Down Expand Up @@ -141,6 +142,7 @@ async def __run(self: ConnectSpotWebsocketBase, event: asyncio.Event) -> None:
except ValueError:
self.LOG.warning(_msg)
else:
self.LOG.debug(message)
self._manage_subscriptions(message=message)
await self.__callback(message)

Expand Down Expand Up @@ -515,12 +517,14 @@ def _manage_subscriptions(self: ConnectSpotWebsocketV2, message: dict) -> None:
"""
if message.get("method") == "subscribe":
if message.get("success") and message.get("result"):
message = self.__transform_subscription(subscription=message)
self.__append_subscription(subscription=message["result"])
else:
self.LOG.warning(message)

elif message.get("method") == "unsubscribe":
if message.get("success") and message.get("result"):
message = self.__transform_subscription(subscription=message)
self.__remove_subscription(subscription=message["result"])
else:
self.LOG.warning(message)
Expand Down Expand Up @@ -550,6 +554,44 @@ def __remove_subscription(self: ConnectSpotWebsocketV2, subscription: dict) -> N
del self._subscriptions[position]
return

def __transform_subscription(
self: ConnectSpotWebsocketV2,
subscription: dict,
) -> dict:
"""
Returns a dictionary that can be used to subscribe to a websocket feed.
This function is most likely used to parse incoming un-/subscription
messages.
:param subscription: The raw un-/subscription confirmation
:type subscription: dict
:return: The "corrected" subscription
:rtype: dict
"""
# Without deepcopy, the passed message will be modified, which is *not*
# intended.
subscription_copy: dict = deepcopy(subscription)

# Subscriptions for specific symbols must contain the 'symbols' key with
# a value of type list[str]. The python-kraken-sdk is caching active
# subscriptions from that moment, the successful response arrives. These
# responses must be parsed to use them to resubscribe on connection
# losses.
if subscription["result"].get("channel", "") in (
"book",
"ticker",
"ohlc",
"trade",
) and not isinstance(
subscription["result"].get("symbol"),
list,
):
subscription_copy["result"]["symbol"] = [
subscription_copy["result"]["symbol"],
]

return subscription_copy


__all__ = [
"ConnectSpotWebsocketBase",
Expand Down
79 changes: 79 additions & 0 deletions tests/spot/test_spot_websocket_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
from __future__ import annotations

from asyncio import run as asyncio_run
from copy import deepcopy
from typing import Any

import pytest

from kraken.exceptions import KrakenException
from kraken.spot.websocket.connectors import ConnectSpotWebsocketV2

from .helper import SpotWebsocketClientV2TestWrapper, async_wait

Expand Down Expand Up @@ -381,3 +383,80 @@ async def check_unsubscribe() -> None:
'{"method": "unsubscribe", "req_id": 987654321, "result": {"channel": "executions"}, "success": true, "time_in": ',
):
assert expected in caplog.text


@pytest.mark.wip()
@pytest.mark.spot()
@pytest.mark.spot_websocket()
@pytest.mark.spot_websocket_v2()
def test___transform_subscription() -> None:
"""
Checks if the subscription transformation works properly by checking
the condition for multiple channels. This test may be trivial but in case
Kraken changes anything on that implementation, this will break and makes it
easier to track down the change.
"""

incoming_subscription: dict
target_subscription: dict
for channel in ("book", "ticker", "ohlc", "trade"):
incoming_subscription = {
"method": "subscribe",
"result": {
"channel": channel,
"depth": 10,
"snapshot": True,
"symbol": "BTC/USD",
},
"success": True,
"time_in": "2023-08-30T04:59:14.052226Z",
"time_out": "2023-08-30T04:59:14.052263Z",
}

target_subscription = deepcopy(incoming_subscription)
target_subscription["result"]["symbol"] = ["BTC/USD"]

assert (
ConnectSpotWebsocketV2._ConnectSpotWebsocketV2__transform_subscription(
ConnectSpotWebsocketV2,
subscription=incoming_subscription,
)
== target_subscription
)


@pytest.mark.wip()
@pytest.mark.spot()
@pytest.mark.spot_websocket()
@pytest.mark.spot_websocket_v2()
def test___transform_subscription_no_change() -> None:
"""
Similar to the test above -- but verifying that messages that don't need an
adjustment remain unchanged.
This test must be extended in case Kraken decides to changes more
parameters.
"""

incoming_subscription: dict
for channel in ("book", "ticker", "ohlc", "trade"):
incoming_subscription = {
"method": "subscribe",
"result": {
"channel": channel,
"depth": 10,
"snapshot": True,
"symbol": ["BTC/USD"],
},
"success": True,
"time_in": "2023-08-30T04:59:14.052226Z",
"time_out": "2023-08-30T04:59:14.052263Z",
}

assert (
ConnectSpotWebsocketV2._ConnectSpotWebsocketV2__transform_subscription(
ConnectSpotWebsocketV2,
subscription=incoming_subscription,
)
== incoming_subscription
)

0 comments on commit a8d7e35

Please sign in to comment.