Skip to content

Commit

Permalink
Aggregate query results (re getredash#35) (getredash#339)
Browse files Browse the repository at this point in the history
* Aggregate query results (re getredash#35)
  • Loading branch information
Allen Short authored Mar 27, 2018
1 parent f2c80df commit 4c17b26
Show file tree
Hide file tree
Showing 15 changed files with 269 additions and 23 deletions.
3 changes: 3 additions & 0 deletions client/app/components/queries/schedule-dialog.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ <h4 class="modal-title">Refresh Schedule</h4>
Stop scheduling at date/time (format yyyy-MM-ddTHH:mm:ss, like 2016-12-28T14:57:00):
<schedule-until query="$ctrl.query" save-query="$ctrl.saveQuery"></schedule-until>
</label>
<label>
Number of query results to keep <schedule-keep-results query="$ctrl.query" save-query="$ctrl.saveQuery"></schedule-keep-results>
</label>
</div>
13 changes: 12 additions & 1 deletion client/app/components/queries/schedule-dialog.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,21 @@ function scheduleUntil() {
};
}

function scheduleKeepResults() {
return {
restrict: 'E',
scope: {
query: '=',
saveQuery: '=',
},
template: '<input type="number" class="form-control" ng-model="query.schedule_resultset_size" ng-change="saveQuery()" ng-disabled="!query.schedule">',
};
}

const ScheduleForm = {
controller() {
this.query = this.resolve.query;
this.saveQuery = this.resolve.saveQuery;

if (this.query.hasDailySchedule()) {
this.refreshType = 'daily';
} else {
Expand All @@ -124,5 +134,6 @@ export default function init(ngModule) {
ngModule.directive('queryTimePicker', queryTimePicker);
ngModule.directive('queryRefreshSelect', queryRefreshSelect);
ngModule.directive('scheduleUntil', scheduleUntil);
ngModule.directive('scheduleKeepResults', scheduleKeepResults);
ngModule.component('scheduleDialog', ScheduleForm);
}
1 change: 0 additions & 1 deletion client/app/pages/alerts-list/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const stateClass = {

class AlertsListCtrl {
constructor(Alert) {

this.showEmptyState = false;
this.showList = false;

Expand Down
1 change: 1 addition & 0 deletions client/app/pages/queries/view.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ function QueryViewCtrl(
} else {
request = pick($scope.query, [
'schedule',
'schedule_resultset_size',
'query',
'id',
'description',
Expand Down
10 changes: 10 additions & 0 deletions client/app/services/query-result.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ function addPointToSeries(point, seriesCollection, seriesName) {

function QueryResultService($resource, $timeout, $q) {
const QueryResultResource = $resource('api/query_results/:id', { id: '@id' }, { post: { method: 'POST' } });
const QueryResultSetResource = $resource('api/queries/:id/resultset', { id: '@id' });
const Job = $resource('api/jobs/:id', { id: '@id' });
const statuses = {
1: 'waiting',
Expand Down Expand Up @@ -421,6 +422,15 @@ function QueryResultService($resource, $timeout, $q) {
return queryResult;
}

static getResultSet(queryId) {
const queryResult = new QueryResult();

QueryResultSetResource.get({ id: queryId }, (response) => {
queryResult.update(response);
});

return queryResult;
}
loadResult(tryCount) {
QueryResultResource.get(
{ id: this.job.query_result_id },
Expand Down
6 changes: 5 additions & 1 deletion client/app/services/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ function QueryResource($resource, $http, $q, $location, currentUser, QueryResult
this.latest_query_data_id = null;
}

if (this.latest_query_data && maxAge !== 0) {
if (this.schedule_resultset_size) {
if (!this.queryResult) {
this.queryResult = QueryResult.getResultSet(this.id);
}
} else if (this.latest_query_data && maxAge !== 0) {
if (!this.queryResult) {
this.queryResult = new QueryResult({
query_result: this.latest_query_data,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
"""incremental query results aggregation
"""Incremental query results aggregation
Revision ID: 2a2b3b58464e
Revision ID: 9d7678c47452
Revises: 15041b7085fe
Create Date: 2018-02-16 19:28:38.931253
Create Date: 2018-03-08 04:36:12.802199
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '2a2b3b58464e'
revision = '9d7678c47452'
down_revision = '15041b7085fe'
branch_labels = None
depends_on = None
Expand All @@ -24,9 +24,11 @@ def upgrade():
sa.ForeignKeyConstraint(['result_id'], ['query_results.id'], ),
sa.PrimaryKeyConstraint('query_id', 'result_id')
)
op.add_column(u'queries', sa.Column('schedule_keep_results', sa.Integer(), nullable=True))

op.add_column(u'queries', sa.Column('schedule_resultset_size', sa.Integer(), nullable=True))
1

def downgrade():
op.drop_column(u'queries', 'schedule_keep_results')
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column(u'queries', 'schedule_resultset_size')
op.drop_table('query_resultsets')
# ### end Alembic commands ###
3 changes: 2 additions & 1 deletion redash/handlers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from redash.handlers.data_sources import DataSourceTypeListResource, DataSourceListResource, DataSourceSchemaResource, DataSourceResource, DataSourcePauseResource, DataSourceTestResource, DataSourceVersionResource
from redash.handlers.events import EventsResource
from redash.handlers.queries import QueryForkResource, QueryRefreshResource, QueryListResource, QueryRecentResource, QuerySearchResource, QueryResource, MyQueriesResource, QueryVersionListResource, ChangeResource
from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource
from redash.handlers.query_results import QueryResultListResource, QueryResultResource, JobResource, QueryResultSetResource
from redash.handlers.users import UserResource, UserListResource, UserInviteResource, UserResetPasswordResource
from redash.handlers.visualizations import VisualizationListResource
from redash.handlers.visualizations import VisualizationResource
Expand Down Expand Up @@ -76,6 +76,7 @@ def json_representation(data, code, headers=None):
api.add_org_resource(QueryRefreshResource, '/api/queries/<query_id>/refresh', endpoint='query_refresh')
api.add_org_resource(QueryResource, '/api/queries/<query_id>', endpoint='query')
api.add_org_resource(QueryForkResource, '/api/queries/<query_id>/fork', endpoint='query_fork')
api.add_org_resource(QueryResultSetResource, '/api/queries/<query_id>/resultset', endpoint='query_aggregate_results')
api.add_org_resource(QueryVersionListResource, '/api/queries/<query_id>/version', endpoint='query_versions')
api.add_org_resource(ChangeResource, '/api/changes/<change_id>', endpoint='changes')

Expand Down
3 changes: 3 additions & 0 deletions redash/handlers/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def post(self):
:<json string description:
:<json string schedule: Schedule interval, in seconds, for repeated execution of this query
:<json string schedule_until: Time in ISO format to stop scheduling this query (may be null to run indefinitely)
:<json number schedule_resultset_size: Number of result sets to keep (null to keep only one)
:<json object options: Query options
.. _query-response-label:
Expand Down Expand Up @@ -134,6 +135,8 @@ def post(self):
query_def['data_source'] = data_source
query_def['org'] = self.current_org
query_def['is_draft'] = True
if query_def.get('schedule_resultset_size') == 1:
query_def['schedule_resultset_size'] = None
query = models.Query.create(**query_def)
query.record_changes(changed_by=self.current_user)
models.db.session.add(query)
Expand Down
29 changes: 28 additions & 1 deletion redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,33 @@ def post(self):
ONE_YEAR = 60 * 60 * 24 * 365.25


class QueryResultSetResource(BaseResource):
@require_permission('view_query')
def get(self, query_id=None, filetype='json'):
query = get_object_or_404(models.Query.get_by_id_and_org, query_id, self.current_org)
if not query.schedule_resultset_size:
abort(404, message="query does not keep multiple results")

# Synthesize a result set from the last N results.
total = len(query.query_results)
offset = max(total - query.schedule_resultset_size, 0)
results = [qr.to_dict() for qr in query.query_results[offset:]]
if not results:
aggregate_result = {}
else:
# Start a synthetic data set with the data from the first result...
aggregate_result = results[0].copy()
aggregate_result['data'] = {'columns': results[0]['data']['columns'],
'rows': []}
# .. then add each subsequent result set into it.
for r in results:
aggregate_result['data']['rows'].extend(r['data']['rows'])

data = json.dumps({'query_result': aggregate_result}, cls=utils.JSONEncoder)
headers = {'Content-Type': "application/json"}
return make_response(data, 200, headers)


class QueryResultResource(BaseResource):
@staticmethod
def add_cors_headers(headers):
Expand Down Expand Up @@ -194,7 +221,7 @@ def get(self, query_id=None, query_result_id=None, filetype='json'):
query_result = run_query_sync(query.data_source, parameter_values, query.to_dict()['query'], max_age=max_age)
elif query.latest_query_data_id is not None:
query_result = get_object_or_404(models.QueryResult.get_by_id_and_org, query.latest_query_data_id, self.current_org)

if query is not None and query_result is not None and self.current_user.is_api_user():
if query.query_hash != query_result.query_hash:
abort(404, message='No cached result found for this query.')
Expand Down
55 changes: 51 additions & 4 deletions redash/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from redash.utils.comparators import CaseInsensitiveComparator
from redash.utils.configuration import ConfigurationContainer
from redash.settings.organization import settings as org_settings
from sqlalchemy import distinct, or_
from sqlalchemy import distinct, exists, or_
from sqlalchemy.dialects import postgresql
from sqlalchemy.event import listens_for
from sqlalchemy.ext.mutable import Mutable
Expand Down Expand Up @@ -728,9 +728,9 @@ def to_dict(self):
def unused(cls, days=7):
age_threshold = datetime.datetime.now() - datetime.timedelta(days=days)

unused_results = (db.session.query(QueryResult.id).filter(
Query.id == None, QueryResult.retrieved_at < age_threshold)
.outerjoin(Query))
unused_results = db.session.query(QueryResult.id).filter(
QueryResult.retrieved_at < age_threshold,
~QueryResultSet.query.filter(QueryResultSet.result_id == QueryResult.id).exists())

return unused_results

Expand Down Expand Up @@ -769,9 +769,12 @@ def store_result(cls, org, data_source, query_hash, query, data, run_time, retri
queries = db.session.query(Query).filter(
Query.query_hash == query_hash,
Query.data_source == data_source)

for q in queries:
q.latest_query_data = query_result
db.session.add(q)
if q.schedule_resultset_size > 0:
q.query_results.append(query_result)
query_ids = [q.id for q in queries]
logging.info("Updated %s queries with result (%s).", len(query_ids), query_hash)

Expand Down Expand Up @@ -851,6 +854,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
data_source = db.relationship(DataSource, backref='queries')
latest_query_data_id = Column(db.Integer, db.ForeignKey("query_results.id"), nullable=True)
latest_query_data = db.relationship(QueryResult)
query_results = db.relationship("QueryResult", secondary="query_resultsets")
name = Column(db.String(255))
description = Column(db.String(4096), nullable=True)
query_text = Column("query", db.Text)
Expand All @@ -866,6 +870,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
schedule = Column(db.String(10), nullable=True)
schedule_failures = Column(db.Integer, default=0)
schedule_until = Column(db.DateTime(True), nullable=True)
schedule_resultset_size = Column(db.Integer, nullable=True)
visualizations = db.relationship("Visualization", cascade="all, delete-orphan")
options = Column(MutableDict.as_mutable(PseudoJSON), default={})
search_vector = Column(TSVectorType('id', 'name', 'description', 'query',
Expand All @@ -892,6 +897,7 @@ def to_dict(self, with_stats=False, with_visualizations=False, with_user=True, w
'query_hash': self.query_hash,
'schedule': self.schedule,
'schedule_until': self.schedule_until,
'schedule_resultset_size': self.schedule_resultset_size,
'api_key': self.api_key,
'is_archived': self.is_archived,
'is_draft': self.is_draft,
Expand Down Expand Up @@ -1000,6 +1006,37 @@ def outdated_queries(cls):

return outdated_queries.values()

@classmethod
def delete_stale_resultsets(cls):
delete_count = 0
texts = [c[0] for c in db.session.query(Query.query_text)
.filter(Query.schedule_resultset_size != None).distinct()]
for text in texts:
queries = (Query.query.filter(Query.query_text == text,
Query.schedule_resultset_size != None)
.order_by(Query.schedule_resultset_size.desc()))
# Multiple queries with the same text may request multiple result sets
# be kept. We start with the one that keeps the most, and delete both
# the unneeded bridge rows and result sets.
first_query = queries.first()
if first_query is not None and first_query.schedule_resultset_size:
resultsets = QueryResultSet.query.filter(QueryResultSet.query_rel == first_query).order_by(QueryResultSet.result_id)
resultset_count = resultsets.count()
if resultset_count > first_query.schedule_resultset_size:
n_to_delete = resultset_count - first_query.schedule_resultset_size
r_ids = [r.result_id for r in resultsets][:n_to_delete]
QueryResultSet.query.filter(QueryResultSet.result_id.in_(r_ids)).delete(synchronize_session=False)
delete_count += QueryResult.query.filter(QueryResult.id.in_(r_ids)).delete(synchronize_session=False)
# By this point there are no stale result sets left.
# Delete unneeded bridge rows for the remaining queries.
for q in queries[1:]:
resultsets = db.session.query(QueryResultSet.result_id).filter(QueryResultSet.query_rel == q).order_by(QueryResultSet.result_id)
n_to_delete = resultsets.count() - q.schedule_resultset_size
if n_to_delete > 0:
stale_r = QueryResultSet.query.filter(QueryResultSet.result_id.in_(resultsets.limit(n_to_delete).subquery()))
stale_r.delete(synchronize_session=False)
return delete_count

@classmethod
def search(cls, term, group_ids, include_drafts=False, limit=20):
where = cls.is_archived == False
Expand Down Expand Up @@ -1089,6 +1126,16 @@ def __repr__(self):
return '<Query %s: "%s">' % (self.id, self.name or 'untitled')


class QueryResultSet(db.Model):
query_id = Column(db.Integer, db.ForeignKey("queries.id"),
primary_key=True)
query_rel = db.relationship(Query)
result_id = Column(db.Integer, db.ForeignKey("query_results.id"),
primary_key=True)
result = db.relationship(QueryResult)
__tablename__ = 'query_resultsets'


@vectorizer(db.Integer)
def integer_vectorizer(column):
return db.func.cast(column, db.Text)
Expand Down
1 change: 1 addition & 0 deletions redash/tasks/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ def cleanup_query_results():
deleted_count = models.QueryResult.query.filter(
models.QueryResult.id.in_(unused_query_results.subquery())
).delete(synchronize_session=False)
deleted_count += models.Query.delete_stale_resultsets()
models.db.session.commit()
logger.info("Deleted %d unused query results.", deleted_count)

Expand Down
7 changes: 6 additions & 1 deletion tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ def __call__(self):
query_hash=gen_query_hash('SELECT 1'),
data_source=data_source_factory.create,
org_id=1)

query_resultset_factory = ModelFactory(redash.models.QueryResultSet,
query_rel=query_factory.create,
result=query_result_factory.create)
visualization_factory = ModelFactory(redash.models.Visualization,
type='CHART',
query_rel=query_factory.create,
Expand Down Expand Up @@ -295,6 +297,9 @@ def create_query_result(self, **kwargs):

return query_result_factory.create(**args)

def create_query_resultset(self, **kwargs):
return query_resultset_factory.create(**kwargs)

def create_visualization(self, **kwargs):
args = {
'query_rel': self.create_query()
Expand Down
Loading

0 comments on commit 4c17b26

Please sign in to comment.