Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agg feedz #414

Merged
merged 49 commits into from
Jan 13, 2023
Merged

Agg feedz #414

merged 49 commits into from
Jan 13, 2023

Conversation

goodboy
Copy link
Contributor

@goodboy goodboy commented Nov 4, 2022

Enhances our piker.open_feed() real-time quotes and history managment layer piker.data.feed to accept multi-fqsn inputs to deliver multi-symbol quote streams and a new internal data streaming abstraction/API: .data.Flume which provides the basis for real-time stream mangement, access and measure for the needs of real-time data-flow management and orchestration.


Synopsis

The final core-UX feature you always wanted as a chart trader is probably something like:

mult-instrument overlayed real-time and historical data feeds with simultaneous interaction and "current symbol" selectable order mode control..

well, this is finally within reach 😎 and this patch add the "backend" work making it possible 🏄🏼


Notes for manual testing

Ideally reviewers run the new feeds test set with pytest tests/test_feeds.py.
Note that you'll need to install the piker_pin branch of tractor in order for the test set to run green:


to land

  • fill out commit msg for 7abcb3e which was initial (half-working) patch to get basic funtionality

  • port all consumer code in clearing, order mode, charting/graphics layer to expect this adjusted Feed api.

  • add basic per-brokerd multi-symbol real-time feeds working

    • enables piker.open_feed(fqsns=['btcusdt.binance', 'ethusdt.binance']) as feed) where the delivered Feed now has a .flumes: dict[str, Flume] which enables per-fqsn data flow access, mgmt, measure (see the historical flume for idea behind this abstraction terminology)
    • add a test in test_feeds.py
      • binance multi-symbol case
      • kraken multi-symbol case
      • kraken currently seems to depend on a brokers.toml existing? we should fix this..
  • add cross-brokerd multi-feeds such that `piker.open_feed(fqsns=['btcusdt.binance', 'xbtusdt.kraken']) will work with an aggregate receive channel delivering quotes from both backends?

    • multi-brokerd multi-sym case (kraken+binance`)

Test suite TODO: see two comments below

Base automatically changed from pg_exts_fork to master November 8, 2022 17:47
@goodboy goodboy force-pushed the agg_feedz branch 2 times, most recently from 56d312d to 091329d Compare November 10, 2022 18:19
@goodboy goodboy added data-layer real-time and historical data processing and storage broker-backend `brokerd`/`datad` related backend tech brokers-can-smbz features that suits should provide labels Nov 10, 2022
@goodboy goodboy force-pushed the agg_feedz branch 3 times, most recently from 9ca5152 to df6df2e Compare November 12, 2022 18:47
goodboy added a commit that referenced this pull request Nov 12, 2022
This slipped in early from #414 before merge and was likely due to
cherry-picking from #417.
@goodboy goodboy mentioned this pull request Nov 15, 2022
43 tasks
@goodboy goodboy force-pushed the agg_feedz branch 3 times, most recently from 701de9f to 4948bae Compare November 17, 2022 20:40
@goodboy goodboy changed the base branch from master to ib_contract_updates November 17, 2022 20:42
@goodboy
Copy link
Contributor Author

goodboy commented Nov 17, 2022

Le's try to land #421 before this.

Base automatically changed from ib_contract_updates to master November 17, 2022 21:38
@goodboy goodboy force-pushed the agg_feedz branch 2 times, most recently from 4a35fcb to 13d81eb Compare December 10, 2022 21:15
@goodboy
Copy link
Contributor Author

goodboy commented Dec 13, 2022

Test suite TODO:

  • we can't run binance tests from CI due to ip blacklisting on their end.. see the CI error
    • instead only run kraken tests in CI (using fixture logic for detection) and expect devs to run the full suite manually on PRs
  • the initial fix for test failures on teardown was from the combo commits in Don't unset actor global on root teardown goodboy/tractor#347, but those fixes compeletely break tractor 😂, so we need either a better fix or to adjust the feeds tests to expect the streaming teardown gunk.
    good news, seems like this issue went away with the new open_test_pikerd() addition

Set each quote-stream by matching the provider for each `Flume` and thus
results in some flumes mapping to the same (multiplexed) stream.
Monkey-patch the equivalent `tractor.MsgStream._ctx: tractor.Context` on
each broadcast-receiver subscription to allow use by feed bus methods as
well as other internals which need to reference IPC channel/portal info.

Start a `_FeedsBus` subscription management API:
- add `.get_subs()` which returns the list of tuples registered for the
  given key (normally the fqsn).
- add `.remove_sub()` which allows removing by key and tuple value and
  provides encapsulation for sampler task(s) which deal with dropped
  connections/subscribers.
Previously we would only detect overruns and drop subscriptions on
non-throttled feed subs, however you can get the same issue with
a wrapping throttler task:
- the intermediate mem chan can be blocked either by the throttler task
  being too slow, in which case we still want to warn about it
- the stream's IPC channel actually breaks and we still want to drop
  the connection and subscription so it doesn't be come a source of
  stale backpressure.
Allows using `set` ops for subscription management and guarantees no
duplicates per `brokerd` actor. New API is simpler for dynamic
pause/resume changes per `Feed`:
- `_FeedsBus.add_subs()`, `.get_subs()`, `.remove_subs()` all accept multi-sub
  `set` inputs.
- `Feed.pause()` / `.resume()` encapsulates management of *only* sending
  a msg on each unique underlying IPC msg stream.

Use new api in sampler task.
Instead of requiring any `-b` try to import all built-in broker backend
python modules by default and only load those detected from the input symbol
list's fqsn values. In other words the `piker chart` cmd can be run sin
`-b` now and that flag is only required if you only want to load
a subset of the built-ins or are trying to load a specific
not-yet-builtin backend.
Seems that by default their history indexing rounds down/back to the
previous time step, so make sure we add a minute inside `Client.bars()`
when the `end_dt=None`, indicating "get the latest bar". Add
a breakpoint block that should trigger whenever the latest bar vs. the
latest epoch time is mismatched; we'll remove this after some testing
verifying the history bars issue is resolved.

Further this drops the legacy `backfill_bars()` endpoint which has been
deprecated and unused for a while.
@goodboy goodboy merged commit 8d1eb81 into master Jan 13, 2023
@goodboy goodboy deleted the agg_feedz branch January 13, 2023 17:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
broker-backend `brokerd`/`datad` related backend tech brokers-can-smbz features that suits should provide data-layer real-time and historical data processing and storage
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants