diff --git a/client/app/components/admin/CeleryStatus.jsx b/client/app/components/admin/CeleryStatus.jsx deleted file mode 100644 index 959b10e6ce..0000000000 --- a/client/app/components/admin/CeleryStatus.jsx +++ /dev/null @@ -1,95 +0,0 @@ -import { map } from 'lodash'; -import React from 'react'; -import PropTypes from 'prop-types'; - -import Table from 'antd/lib/table'; -import Card from 'antd/lib/card'; -import Spin from 'antd/lib/spin'; -import Badge from 'antd/lib/badge'; -import { Columns } from '@/components/items-list/components/ItemsTable'; - -// CounterCard - -export function CounterCard({ title, value, loading }) { - return ( - - - {title} -
{value}
-
-
- ); -} - -CounterCard.propTypes = { - title: PropTypes.string.isRequired, - value: PropTypes.oneOfType([PropTypes.number, PropTypes.string]), - loading: PropTypes.bool.isRequired, -}; - -CounterCard.defaultProps = { - value: '', -}; - -// Tables - -const commonColumns = [ - { title: 'Worker Name', dataIndex: 'worker' }, - { title: 'PID', dataIndex: 'worker_pid' }, - { title: 'Queue', dataIndex: 'queue' }, - Columns.custom((value) => { - if (value === 'active') { - return Active; - } - return {value}; - }, { - title: 'State', - dataIndex: 'state', - }), - Columns.timeAgo({ title: 'Start Time', dataIndex: 'start_time' }), -]; - -const queryColumns = commonColumns.concat([ - Columns.timeAgo({ title: 'Enqueue Time', dataIndex: 'enqueue_time' }), - { title: 'Query ID', dataIndex: 'query_id' }, - { title: 'Org ID', dataIndex: 'org_id' }, - { title: 'Data Source ID', dataIndex: 'data_source_id' }, - { title: 'User ID', dataIndex: 'user_id' }, - { title: 'Scheduled', dataIndex: 'scheduled' }, -]); - -const queuesColumns = map( - ['Name', 'Active', 'Reserved', 'Waiting'], - c => ({ title: c, dataIndex: c.toLowerCase() }), -); - -const TablePropTypes = { - loading: PropTypes.bool.isRequired, - items: PropTypes.arrayOf(PropTypes.object).isRequired, -}; - -export function QueuesTable({ loading, items }) { - return ( - - ); -} - -QueuesTable.propTypes = TablePropTypes; - -export function QueriesTable({ loading, items }) { - return ( -
- ); -} - -QueriesTable.propTypes = TablePropTypes; diff --git a/client/app/components/admin/Layout.jsx b/client/app/components/admin/Layout.jsx index 36f2f7e70a..4ac9621b27 100644 --- a/client/app/components/admin/Layout.jsx +++ b/client/app/components/admin/Layout.jsx @@ -15,9 +15,6 @@ export default function Layout({ activeTab, children }) { System Status}> {(activeTab === 'system_status') ? children : null} - Celery Status}> - {(activeTab === 'tasks') ? children : null} - RQ Status}> {(activeTab === 'jobs') ? children : null} diff --git a/client/app/components/admin/RQStatus.jsx b/client/app/components/admin/RQStatus.jsx index 88c937dd17..ffc14a679a 100644 --- a/client/app/components/admin/RQStatus.jsx +++ b/client/app/components/admin/RQStatus.jsx @@ -3,9 +3,34 @@ import React from 'react'; import PropTypes from 'prop-types'; import Badge from 'antd/lib/badge'; +import Card from 'antd/lib/card'; +import Spin from 'antd/lib/spin'; import Table from 'antd/lib/table'; import { Columns } from '@/components/items-list/components/ItemsTable'; +// CounterCard + +export function CounterCard({ title, value, loading }) { + return ( + + + {title} +
{value}
+
+
+ ); +} + +CounterCard.propTypes = { + title: PropTypes.string.isRequired, + value: PropTypes.oneOfType([PropTypes.number, PropTypes.string]), + loading: PropTypes.bool.isRequired, +}; + +CounterCard.defaultProps = { + value: '', +}; + // Tables const otherJobsColumns = [ diff --git a/client/app/pages/admin/Jobs.jsx b/client/app/pages/admin/Jobs.jsx index f04efc39bc..60fa0a9a60 100644 --- a/client/app/pages/admin/Jobs.jsx +++ b/client/app/pages/admin/Jobs.jsx @@ -6,8 +6,7 @@ import Alert from 'antd/lib/alert'; import Tabs from 'antd/lib/tabs'; import * as Grid from 'antd/lib/grid'; import Layout from '@/components/admin/Layout'; -import { CounterCard } from '@/components/admin/CeleryStatus'; -import { WorkersTable, QueuesTable, OtherJobsTable } from '@/components/admin/RQStatus'; +import { CounterCard, WorkersTable, QueuesTable, OtherJobsTable } from '@/components/admin/RQStatus'; import { $http, $location, $rootScope } from '@/services/ng'; import recordEvent from '@/services/recordEvent'; diff --git a/client/app/pages/admin/Tasks.jsx b/client/app/pages/admin/Tasks.jsx deleted file mode 100644 index 566d5c94a3..0000000000 --- a/client/app/pages/admin/Tasks.jsx +++ /dev/null @@ -1,135 +0,0 @@ -import { values, each } from 'lodash'; -import moment from 'moment'; -import React from 'react'; -import { react2angular } from 'react2angular'; - -import Alert from 'antd/lib/alert'; -import Tabs from 'antd/lib/tabs'; -import * as Grid from 'antd/lib/grid'; -import Layout from '@/components/admin/Layout'; -import { CounterCard, QueuesTable, QueriesTable } from '@/components/admin/CeleryStatus'; - -import { $http } from '@/services/ng'; -import recordEvent from '@/services/recordEvent'; -import { routesToAngularRoutes } from '@/lib/utils'; - -// Converting name coming from API to the one the UI expects. -// TODO: update the UI components to use `waiting_in_queue` instead of `waiting`. -function stateName(state) { - if (state === 'waiting_in_queue') { - return 'waiting'; - } - return state; -} - -class Tasks extends React.Component { - state = { - isLoading: true, - error: null, - - queues: [], - queries: [], - counters: { active: 0, reserved: 0, waiting: 0 }, - }; - - componentDidMount() { - recordEvent('view', 'page', 'admin/tasks'); - $http - .get('/api/admin/queries/tasks') - .then(({ data }) => this.processTasks(data.tasks)) - .catch(error => this.handleError(error)); - } - - componentWillUnmount() { - // Ignore data after component unmounted - this.processTasks = () => {}; - this.handleError = () => {}; - } - - processTasks = (tasks) => { - const queues = {}; - const queries = []; - - const counters = { active: 0, reserved: 0, waiting: 0 }; - - each(tasks, (task) => { - task.state = stateName(task.state); - queues[task.queue] = queues[task.queue] || { name: task.queue, active: 0, reserved: 0, waiting: 0 }; - queues[task.queue][task.state] += 1; - - if (task.enqueue_time) { - task.enqueue_time = moment(task.enqueue_time * 1000.0); - } - if (task.start_time) { - task.start_time = moment(task.start_time * 1000.0); - } - - counters[task.state] += 1; - - if (task.task_name === 'redash.tasks.execute_query') { - queries.push(task); - } - }); - - this.setState({ isLoading: false, queues: values(queues), queries, counters }); - }; - - handleError = (error) => { - this.setState({ isLoading: false, error }); - }; - - render() { - const { isLoading, error, queues, queries, counters } = this.state; - - return ( - -
- {error && ( - - )} - - {!error && ( - - - - - - - - - - - - - - - - - - - - - - - )} -
-
- ); - } -} - -export default function init(ngModule) { - ngModule.component('pageTasks', react2angular(Tasks)); - - return routesToAngularRoutes([ - { - path: '/admin/queries/tasks', - title: 'Celery Status', - key: 'tasks', - }, - ], { - template: '', - }); -} - -init.init = true; diff --git a/redash/handlers/admin.py b/redash/handlers/admin.py index ba8dd30ec9..62eeaf1309 100644 --- a/redash/handlers/admin.py +++ b/redash/handlers/admin.py @@ -8,7 +8,7 @@ from redash.permissions import require_super_admin from redash.serializers import QuerySerializer from redash.utils import json_loads -from redash.monitor import celery_tasks, rq_status +from redash.monitor import rq_status @routes.route('/api/admin/queries/outdated', methods=['GET']) @@ -38,22 +38,6 @@ def outdated_queries(): return json_response(response) -@routes.route('/api/admin/queries/tasks', methods=['GET']) -@require_super_admin -@login_required -def queries_tasks(): - record_event(current_org, current_user._get_current_object(), { - 'action': 'list', - 'object_type': 'celery_tasks' - }) - - response = { - 'tasks': celery_tasks(), - } - - return json_response(response) - - @routes.route('/api/admin/queries/rq_status', methods=['GET']) @require_super_admin @login_required diff --git a/redash/monitor.py b/redash/monitor.py index cd9471dff0..b062db295d 100644 --- a/redash/monitor.py +++ b/redash/monitor.py @@ -4,7 +4,6 @@ from redash import redis_connection, rq_redis_connection, __version__, settings from redash.models import db, DataSource, Query, QueryResult, Dashboard, Widget from redash.utils import json_loads -from redash.worker import celery from rq import Queue, Worker from rq.job import Job from rq.registry import StartedJobRegistry @@ -26,17 +25,8 @@ def get_object_counts(): return status -def get_celery_queues(): - queue_names = db.session.query(DataSource.queue_name).distinct() - scheduled_queue_names = db.session.query(DataSource.scheduled_queue_name).distinct() - query = db.session.execute(union_all(queue_names, scheduled_queue_names)) - - return ['celery'] + [row[0] for row in query] - - def get_queues_status(): - return {**{queue: {'size': redis_connection.llen(queue)} for queue in get_celery_queues()}, - **{queue.name: {'size': len(queue)} for queue in Queue.all(connection=rq_redis_connection)}} + return {queue.name: {'size': len(queue)} for queue in Queue.all(connection=rq_redis_connection)} def get_db_sizes(): @@ -67,73 +57,6 @@ def get_status(): return status -def get_waiting_in_queue(queue_name): - jobs = [] - for raw in redis_connection.lrange(queue_name, 0, -1): - job = json_loads(raw) - try: - args = json_loads(job['headers']['argsrepr']) - if args.get('query_id') == 'adhoc': - args['query_id'] = None - except ValueError: - args = {} - - job_row = { - 'state': 'waiting_in_queue', - 'task_name': job['headers']['task'], - 'worker': None, - 'worker_pid': None, - 'start_time': None, - 'task_id': job['headers']['id'], - 'queue': job['properties']['delivery_info']['routing_key'] - } - - job_row.update(args) - jobs.append(job_row) - - return jobs - - -def parse_tasks(task_lists, state): - rows = [] - - for task in itertools.chain(*task_lists.values()): - task_row = { - 'state': state, - 'task_name': task['name'], - 'worker': task['hostname'], - 'queue': task['delivery_info']['routing_key'], - 'task_id': task['id'], - 'worker_pid': task['worker_pid'], - 'start_time': task['time_start'], - } - - if task['name'] == 'redash.tasks.execute_query': - try: - args = json_loads(task['args']) - except ValueError: - args = {} - - if args.get('query_id') == 'adhoc': - args['query_id'] = None - - task_row.update(args) - - rows.append(task_row) - - return rows - - -def celery_tasks(): - tasks = parse_tasks(celery.control.inspect().active(), 'active') - tasks += parse_tasks(celery.control.inspect().reserved(), 'reserved') - - for queue_name in get_celery_queues(): - tasks += get_waiting_in_queue(queue_name) - - return tasks - - def fetch_jobs(queue, job_ids): return [{ 'id': job.id,