Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] TimeSeriesWorkload enhancements + New TS Workloads + Cassandra Adapater for TimeSeriesWorkload #1407

Draft
wants to merge 27 commits into
base: master
Choose a base branch
from

Conversation

smartygus
Copy link

@smartygus smartygus commented Feb 19, 2020

This is still a work in progress with still stuff to be done to get it to a mergeable standard, but I am opening a pull request in its current state to get some feedback from the maintainers on the direction and whether anything major would need to be changed to get it merged.

The three main things this Pull Request includes are as follows:

  • Updated version of TimeSeriesWorkload to broaden the range of workloads that are possible
  • A new set of time series workloads based on the capabilities of the updated TimeSeriesWorkload class
  • A Cassandra client adapter specifically for running TimeSeriesWorkload workloads.

Here's the more detailed explanation of these things:

This pull request makes quite a few changes/updates to the TimeSeriesWorkload class, to provide support for richer and more flexible time series workloads. The full scope of the changes is probably best seen in the comments of the updated tsworkload_template, where all the new configuration properties are documented. But in essence it adds/changes the following:

  • Per-transaction type configuration of most parameters like starting/ending timestamp, group by parameters and so-on
  • The possibility to specify multiple, indexed, SCAN queries in one workload, each with their own set of parameters, while still retaining the ability to specify a single SCAN query without any indexing
  • Additional option for the dividing up of the write workload over the available threads: this can now by done in groups of individual time series (a time series = metric name + set of tag key-value pairs), so that even workloads designed around a single metric can be parallelised for maximum loading potential from a single YCSB client

I also made some tweaks to TimeseriesDB ( introduced in #1095 ) along the way, to fix some bugs I found and support the updated TimeSeriesWorkload class.

My hope would be that this could take the TimeSeriesWorkload from experimental status to actually being worthy of being used to benchmark time series data stores.

In addition to these changes, I've developed a set of time series workloads based on these richer capabilities and included them in this pull request. These were developed as part of a deep-dive into time series data, its analysis, and workloads that I did over the past few months as part of my bachelor thesis. The workloads are divided up into two groups based on scenario: Smart Metering, and DevOps Monitoring. Perhaps these could be included as the new standard workloads for TimeSeriesWorkload?

Lastly, I've created a new client adapter for Cassandra (and by extension ScyllaDB) specifically designed to run the workloads from TimeSeriesWorkload. I've used this to carry out numerous experiments and compare the performance of Cassandra and ScyllaDB, and it works reasonably well, but I would characterise it as an "initial implementation", not least because it has been designed with a particular schema, and Cassandra doesn't have a standardised schema for time series data. Depending on one's requirements, one type of schema might perform better than another. I chose a flexible schema that would support all of the various options available in TimeSeriesWorkload, but at the moment the tradeoff is that all the aggregation is done client-side and some queries require the use of ALLOW FILTERING.

I created this primarily because Cassandra (and ScyllaDB) are quite often promoted as being a good fit for time series data, but the only benchmarks out there that I could find were from vendors of other time series databases looking to compare their own product to Cassandra (among others). I'm automatically somewhat skeptical of vendor-published benchmarks, and thought it would be great to have a way to benchmark Cassandra/ScyllaDB versus other time series databases that's independent of any vendor. The initial work done on TimeSeriesWorkload and TimeseriesDB struck me as a good foundation for building out support for rich and flexible time series workloads in YCSB, and building a client adapter for time series, so I took these and ran with them.

That said, at the moment, there are still no other adapters in YCSB for time series databases - many were planned to be brought over from YCSB-TS in #1068, but work there has stalled a bit. Nevertheless, there was no adapter for Cassandra in YCSB-TS and I think its a very interesting use case for a lot of people, because Cassandra (and by extension ScyllaDB) is not purpose-built for time series data, but as I said is often used for this purpose.

It'd be great to have some feedback!

There's still some stuff to do. Here's a list of the things I can think of at the moment:

  • Fix tests for TimeSeriesWorkload
  • Update class documentation for TimeSeriesWorkload
  • Write some tests for CassandraCQLClientTS
  • Return query results from CassandraCQLClientTS

- initial commit of WIP on cassandra-ts binding (a binding for Cassandra
  that runs with the TimeSeriesWorkload workloads) - see
  CassandraCQLClientTS.java
- includes a Docker Compose-based development environment
  - includes a startup script for cassandra with automatically
    creates the keyspace and table for cassandra-ts (time will tell if
    this makes sense or whether the binding should create the table by
    itself it it doesn't exist)
  - The container that's used to run YCSB has a simple keep-alive java
    loop that's started when doing a docker-compose up. That way it's
    possible to start the java environment and cassandra and then just
    do YCSB test runs in the running container via docker-compose exec
- new binding is under cassandra-ts/ directory
- implemented so far: insert(), and read()
- noted issues: some read queries issues by the time series workload
  are querying for entries (metric-tags-timestamp combo) that haven't
  actually been inserted yet - haven't had time yet to look into the
  cause
- adds logging via logback
- logback.xml @ cassandra-ts/src/main/resources/logback.xml
- if the cassandra.trace property is set, then the QueryLogger that is
  part of the Datastax driver is setup and registered with the cluster
  and all normal bound queries (including bound parameters) are logged
  to /usr/ycsb/cassandra-ts/log in the docker container which is bound
  via a volume to the host directory, so that queries can be inspected
  during development
- made some changes to TimeseriesDB.java based on some believed errors
  made:
  - when calling the read() and insert() abstract methods (which should
    be implemented by the Timeseries Database binding), the table was
    passed as the first parameter, instead of what I believe should be
    the key, which is equivalent to the metric. This has been fixed for
    read() and insert().
  - update timestamp parsing code in the read() method to not just
    always bail out with a BAD_REQUEST when there was more than one
    timestamp value, because it seems that the TimeSeriesWorkload class
    sends even 'single' timestamp requests as an range, where the start
    and finish timestamps are just the same timestamp. So the code has
    been updated to check for this, and if they are the same, just send
    a normal read request, and if not, then bail out with a BAD_REQUEST
- basic scan() support
- update to the cassandra schema to put tags before the timestamp, so
  that it's possible to use a range query on the timestamp (>= & <=)
  while specifying the tags (not possible to do it if timestamp comes
  before tags due to CQL limitations)
- does not support aggregation or downsampling functions
- does not support tags being empty due to tags coming before the
  timestamp in the clustering keys. In order to support seleting 'all'
  tags for a given metrics, we'd need to use tags IN (...) with a list
  of all the tag combinations. This would require storing the tag
  combos for each metric in a separate table or something like that.
- adds simple single tag workload file with scans and inserts
- support for GroupBy Function with scan queries
- includes support for selecting/filtering by tags (when for example
  the model has more than one tag, and the group by is for one tag, then
  the rest will be given equality restrictions), which is done client
  side
- does rely on the use of the "ALLOW FILTERING" CQL clause, because
  sometimes there are no tags specified, but we are querying for a
  range of timestamps, and in the current model tags comes before
  valuetime in the clustering keys. Normally you can't place query
  predicates on latter clustering keys without also placing them
  on all preceding clustering keys.
- supports following GroupBy functions: SUM, MAX, MIN, AVERAGE, COUNT
- Grouping and aggregation is all done client-side
- includes *lots* of (optional) debug output for verifying correctness
  by showing intermediate process of filtering, grouping and aggregation
- reads queries from a trace text file with one query per line
- with support for insert, read, and scan at the moment
- not tested with multiple threads, would probably break in weird
  ways.
- was ending tag iteration too early, causing key to be incremented
  before all tags had been covered, which in turned caused data points
  not to be inserted, that should have been
- renames rollover variable to timestampRollover, to make it clearer
  which rollover is being referred to
New configuration properties:

- inserttransactionstart: provide a way to supply a different starting
  timestamp for inserts that occur as part of a transaction run (as
  opposed to a load run)
- readstart: provides a way to explicitly specify the starting timestamp
  that should be used for read/scan/update queries (normally takes the
  value from insertstart, if present).
- new properties added to tsworkload_template with explanations
- improved explanations for some existing properties

Changes to TimeSeriesWorkload.java:

- reads the "dotransactions" property to tell if it's a load run or a
  transaction run, so that it can set the starting timestamps for
  inserts accordingly
- updates delayedSeries > 0 check to use a threshold instead of directly
  comparing with zero

Misc:

- updates example workloads to fix typos in the delayedseries &
  delayedintervals property key
- tagPairDelimiter was being added between the downsampleFunction and
  the downsampleInterval, but this is now how it is described in the
  docs, so this removes the extra delimiter between the two (so that
  the value set to the downsampleKey is just a concatentation of the
  function and interval)
- adds support for client-side downsampling with scan queries
- supports the same functions as GROUP BY
- supports time buckets given in units from MILLISECONDS through to DAYS
- right now just calculates the results and outputs them if debug is on
- combining GROUP BY and DOWNSAMPLING is NOT YET supported

TODO: return results
  - previous implementation of groupBy was incorrect, because even
    though it was a SCAN query, which should return a result for each
    timestamp in the range, it was aggregating all the values for each
    group into a single value.
  - This is ongoing work still, but basically aiming towards properly
    grouping matching timestamp values across different time series
    together into single time series per group, using the groupBy
    function supplied.
  - Also working on Downsampling, and combination of GroupBy and
    Downsampling.
  - Current Implementation of case where both GroupBy and Downsampling
    are required is the most elegant and should be propagated to the
    cases for GroupBy (no Downsampling) and Downsampling (no GroupBy).
  - Also need to check that when a SCAN query with no GroupBy or
    Downsampling, but with a non-fully-specified set of tags, is
    received, that this filters appropriately and then "returns"
    multiple raw time series. Right now if there's no GroupBy or
    DownSampling function it's just assumed that the tags were fully
    specified and that we're returning a single time series range.
- pretty extensive update to the TimeSeriesWorkload class
- also includes changes to TimeseriesDB class that were necessary
- The biggest change is that it is now possible to specify multiple
  SCAN queries in a single workload, each with its own query proportion
  and parameters, like start timestamp and range, tag query set, group
  by function and tags to group by, downsampling function and interval.
- A dynamic number of specified SCAN queries is possible.
- tsworkload_template has been expanded to document the new properties
  and how they should be used (probably could still use some
  improvements, but it's a start)
- also separated out some properties that were previously shared amongst
  various workloads so that they can now all be set individually,
  like for READ and DELETE queries
- removed the ability to have a timestamp range on a READ query, because
  in my opinion this does not make sense. A READ query in my opinion is
  for retrieving a single point in time (either from a single timeseries
  or from more than one if not fully specifying the tags). For a query
  over a range of time, a SCAN query should be used.
- GroupBy is still possible with a READ query (assuming multiple time
  series were queried), and the parameters for this can now be
  specified independently of other queries.
- Previously the tag or tags, that one wanted to group by in a groupBy
  query were stored as a regular field along with the other tags, just
  with no value specified, meaning that tag+value meant we were querying
  this tag, and tag by itself meant we wanted to group by this tag. But
  in order for GroupBy to make sense, you need to have more than one
  time series to group over, which means at least one OTHER tag, besides
  the tag being grouped, should normally be left unspecified, so that
  multiple time series are returned and can be aggregated under the tag
  or tags specified for the GroupBy. This was simply not possible with
  the previous implementation. With the addition of a property allowing
  you to specify for each tag key, whether it should be left
  unspecified, set to a specific (fixed) value, or set to a specific
  (random) value, it is now possible to specify "proper" GroupBy queries
- When performing a workload with both an INSERT query and one or more
  SCAN queries, and the start timestamp for the SCAN query is
  specified to be chosen at random, the workload will now choose
  from the dynamically growing time span covering all inserted records,
  starting at the timestamp specified in the workload (if one has been
  specified, but this only works when one has been specified) for where
  the INSERTs should begin during the LOAD phase, right through
  to the most recently written timestamp of the INSERTs that are
  happening during the RUN phase.
- TimeseriesDB has been updated to support executing SCAN queries with
  arbitrary GroupBy and Downsampling Functions. Previously it assumed
  that the Downsampling function that was specified in the properties
  was the only one, not allowing for any variation between different
  queries. This has been changed to just parse the necessary information
  directly from the parameters passed to the function and not look
  at the properties for this information at all.
- TimeseriesDB has also been updated to support the new special key
  where the GroupBy tags are stored, seeing as they were separated out
  from the regular fields. It now properly handles tags that are
  unspecified as well as groupby tags.
- switched to using more elegant solution with single collect run using
  aggregation-function-specific final collector supplied by method
- moved filtering of resultStream including debug output completely into
  method
- still got bunches of commented out stuff to cleanup, will do that
  later once all client-side functionality is working as desired.
- adds checking properties for tag count, so that the tagsMaps passed
  with the query can be checked to see if it is fully-specified (ie.
  if it has the same number of key/value pairs as the specified tagCount
  which determines whether or not an Allow Filtering is needed for the
  CQL query, and whether client-side filtering is necessary
- Updated the existing Client-Side Downsampling code to group first
  by the tags for each row, because we should only be downsampling
  *within* a time series, and when multiple time series are included
  in a ResultSet then we need to handle this
- This is just a change to the existing code structure, before
  re-working the downsampling code along the lines of the GroupBy, and
  GroupBy+Downsampling code
- Re-working of Map-Logging function to work with an arbitrary number of
  levels by utilising recursion
- Changed a few function name for accuracy's sake
- converts Downsampling to use single collect run, with custom
  Collector-returning function based on downsampling function
- Improves debug output for simple SCAN queries with no Aggregation
- previously in TimeseriesDB.java, the read() and scan() abstract
  methods that are supposed to be overwritten by the TSDB Clients
  did not have the variables used to return the query results to the
  Workload in them
- this updates the read() and scan() method signatures in both
  TimeseriesDB and CassandraCQLClientTS to include these variables
- note: the CassandraCQLClientTS doesn't yet actually put anything
  in these data structures, but this paves the way for that
- previously spreading of writes across threads was divided up by the
  keys (aka metrics or fields), but this had a large limitation, in that
  you could only run as many threads as you had metrics. If you wanted
  to benchmark a workload based around a single metric for example, your
  write performance would suffer because you could not run it with more
  than one thread.
- this adds an additional option for spreading writes across multiple
  threads, and it's done now by the individual time series (ie. the
  key/metric + tags combination that represents each time series).
- this is now much more flexible for workload design while still
  allowing maximum levels of multi-threadedness.
- at the moment it's an either-or, I've left the original implementation
  in there for now with a config option (it still defaults to spreading
  by key), and a bunch of conditional logic. If the new way proves to
  be reliable and problem-free, I can't see any reason why it shouldn't
  be the default (and only) way for spreading writes over threads.
- added individual debug properties for CassandraCQLClientTS,
  TimeseriesDB, and TimeSeriesWorkload classes, so that one has a bit
  more fine-grained control over the debug output (allows enabling debug
  output on just one or two of the classes). They still all respect the
  global "debug" property though, and you can get the full debug
  "firehose" :)
- moves the tags column into the partition key, to ensure that
  for workloads that only use a single metrics, the data will still
  be partitioned across nodes in the cluster.
- this will however likely have negative performance effects, because
  Cassandra will need to search all nodes on the cluster for the right
  partition when doing a non-fully-qualified query with a partial set
  of tags, and ALLOW FILTERING enabled.
- It might be worth investigating the option of supporting a table
  schema with one column per tag and secondary indexes
- adds support for a downsampling window length of zero, which signifies
  that all returned results for a time series should be downsampled
  to a single value
- also adds support for converting workload timestamps as necessary
  Cassandra natively uses milliseconds for timestamps, so if the unit
  specified in the workload properties is MILLISECONDS then the
  timestamps are left alone, otherwise they are converted into millis.
- was only being initialised in the thread that was initialising the
  cluster and was causing queries on the other threads to error out
- Initial version of 5 Smart Metering Time Series workloads
 - extra whitespace after property causing exception on load
- same problem fixed in previous commit, this time for all the
  workloads ;)
- remove commented out stuff and things that were no longer used
- changed names
- removed workloads that were created during development and testing
@busbey busbey self-requested a review February 19, 2020 17:52
@busbey
Copy link
Collaborator

busbey commented Feb 19, 2020

this is an excellent development on the TS benchmarking stuff. I'll need to block out some time to dig in, but thanks for taking this first step!

@smartygus
Copy link
Author

Yeah no worries, there’s quite a bit to digest there, so take your time :)
Thanks for reviewing!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants