diff --git a/redash/__init__.py b/redash/__init__.py index dd2f33c64c..b82f257095 100644 --- a/redash/__init__.py +++ b/redash/__init__.py @@ -3,7 +3,7 @@ import urlparse import urllib -import walrus +import redis from flask import Flask, current_app from flask_sslify import SSLify from werkzeug.contrib.fixers import ProxyFix @@ -48,7 +48,7 @@ def create_redis_connection(): else: db = 0 - client = walrus.Database(unix_socket_path=redis_url.path, db=db) + client = redis.StrictRedis(unix_socket_path=redis_url.path, db=db) else: if redis_url.path: redis_db = redis_url.path[1] @@ -56,7 +56,7 @@ def create_redis_connection(): redis_db = 0 # Redis passwords might be quoted with special characters redis_password = redis_url.password and urllib.unquote(redis_url.password) - client = walrus.Database(host=redis_url.hostname, port=redis_url.port, db=redis_db, password=redis_password) + client = redis.StrictRedis(host=redis_url.hostname, port=redis_url.port, db=redis_db, password=redis_password) return client diff --git a/redash/models/__init__.py b/redash/models/__init__.py index 3ca5a32c35..f072310fc3 100644 --- a/redash/models/__init__.py +++ b/redash/models/__init__.py @@ -37,8 +37,7 @@ from .mixins import BelongsToOrgMixin, TimestampMixin from .organizations import Organization from .types import Configuration, MutableDict, MutableList, PseudoJSON -from .users import (AccessPermission, AnonymousUser, ApiUser, Group, User, - UserDetail) # noqa +from .users import (AccessPermission, AnonymousUser, ApiUser, Group, User) # noqa logger = logging.getLogger(__name__) diff --git a/redash/models/base.py b/redash/models/base.py index ec19ebce93..461aa21a3d 100644 --- a/redash/models/base.py +++ b/redash/models/base.py @@ -61,7 +61,6 @@ class GFKBase(object): """ Compatibility with 'generic foreign key' approach Peewee used. """ - # XXX Replace this with table-per-association. object_type = Column(db.String(255)) object_id = Column(db.Integer) diff --git a/redash/models/types.py b/redash/models/types.py index 3220b9db06..105ce768e6 100644 --- a/redash/models/types.py +++ b/redash/models/types.py @@ -1,5 +1,4 @@ import pytz -import walrus from sqlalchemy.types import TypeDecorator from sqlalchemy.ext.indexable import index_property from sqlalchemy.ext.mutable import Mutable @@ -92,15 +91,3 @@ def __init__(self, cast_type, *args, **kwargs): def expr(self, model): expr = super(json_cast_property, self).expr(model) return expr.astext.cast(self.cast_type) - - -class UTCDateTimeField(walrus.DateTimeField): - """ - A walrus DateTimeField that makes the value timezone aware - using the pytz.utc timezone on return. - """ - def python_value(self, value): - value = super(UTCDateTimeField, self).python_value(value) - if value.tzinfo is None: - value = value.replace(tzinfo=pytz.utc) - return value diff --git a/redash/models/users.py b/redash/models/users.py index 036543d7c4..c83f794f3b 100644 --- a/redash/models/users.py +++ b/redash/models/users.py @@ -1,10 +1,10 @@ import hashlib import itertools import logging +import time from functools import reduce from operator import or_ -import walrus from flask import current_app as app, url_for, request_started from flask_login import current_user, AnonymousUserMixin, UserMixin from passlib.apps import custom_app_context as pwd_context @@ -16,107 +16,43 @@ from sqlalchemy_utils.models import generic_repr from redash import redis_connection -from redash.utils import generate_token, utcnow +from redash.utils import generate_token, utcnow, dt_from_timestamp from .base import db, Column, GFKBase from .mixins import TimestampMixin, BelongsToOrgMixin -from .types import json_cast_property, MutableDict, MutableList, UTCDateTimeField +from .types import json_cast_property, MutableDict, MutableList logger = logging.getLogger(__name__) -class UserDetail(walrus.Model): - """ - A walrus data model to store some user data to Redis to be - synced to Postgres asynchronously. - """ - __database__ = redis_connection - __namespace__ = 'redash.user.details' +LAST_ACTIVE_KEY = 'users:last_active_at' - user_id = walrus.IntegerField(index=True) - updated_at = UTCDateTimeField(index=True, default=utcnow) - - @classmethod - def update(cls, user_id): - """ - Update the user details hash using the given redis - pipeline, user id, optional redis id and optional user - details. - - The fields uid, rid and updated (timestamp) are - enforced and can't be overwritten. - """ - # try getting the user detail with the given user ID - # or create one if it doesn't exist yet (e.g. when key was purged) - try: - user_detail = cls.get(cls.user_id == user_id) - # update the timestamp with the current time - user_detail.updated_at = utcnow() - # save to Redis - user_detail.save() - except ValueError: - user_detail = cls.create( - user_id=user_id, - updated_at=utcnow(), - ) - return user_detail - @classmethod - def sync(cls, chunksize=1000): - """ - Syncs user details to Postgres (to the JSON field User.details). - """ - to_sync = {} - try: - for user_detail in cls.all(): - to_sync[user_detail.user_id] = user_detail - - user_ids = list(to_sync.keys()) - if not user_ids: - return - logger.info( - 'syncing users: %s', - ', '.join([str(uid) for uid in user_ids]) - ) - # get all SQLA users that need to be updated - users = User.query.filter(User.id.in_(user_ids)) - for i, user in enumerate(users): - update = to_sync[user.id] - user.active_at = update.updated_at - # flush changes to the database after a certain - # number of items and extend the list of keys to - # stop sync in case of exceptions - if i % chunksize == 0: - db.session.flush() - db.session.commit() - except DBAPIError: - # reset list of keys to stop sync - pass - finally: - user_ids = [str(user_id) for user_id in to_sync.keys()] - if user_ids: - logger.info( - 'Deleting temporary user details for users %s', - ', '.join(user_ids) - ) - delete_query = [ - UserDetail.user_id == str(user_id) - for user_id in user_ids - ] - UserDetail.query_delete(reduce(or_, delete_query)) - - -def update_user_detail(sender, *args, **kwargs): +def sync_last_active_at(): + """ + Update User model with the active_at timestamp from Redis. We first fetch + all the user_ids to update, and then fetch the timestamp to minimize the + time between fetching the value and updating the DB. This is because there + might be a more recent update we skip otherwise. + """ + user_ids = redis_connection.hkeys(LAST_ACTIVE_KEY) + for user_id in user_ids: + timestamp = redis_connection.hget(LAST_ACTIVE_KEY, user_id) + active_at = dt_from_timestamp(timestamp) + user = User.query.filter(User.id == user_id).first() + if user: + user.active_at = active_at + redis_connection.hdel(LAST_ACTIVE_KEY, user_id) + db.session.commit() + + +def update_user_active_at(sender, *args, **kwargs): """ Used as a Flask request_started signal callback that adds the current user's details to Redis """ - if ( - current_user.get_id() and - current_user.is_authenticated and - not current_user.is_api_user() - ): - UserDetail.update(current_user.id) + if current_user.is_authenticated and not current_user.is_api_user(): + redis_connection.hset(LAST_ACTIVE_KEY, current_user.id, int(time.time())) def init_app(app): @@ -124,7 +60,7 @@ def init_app(app): A Flask extension to keep user details updates in Redis and sync it periodically to the database (User.details). """ - request_started.connect(update_user_detail, app) + request_started.connect(update_user_active_at, app) class PermissionsCheckMixin(object): @@ -150,7 +86,6 @@ class User(TimestampMixin, db.Model, BelongsToOrgMixin, UserMixin, PermissionsCh email = Column(EmailType) _profile_image_url = Column('profile_image_url', db.String(320), nullable=True) password_hash = Column(db.String(128), nullable=True) - # XXX replace with association table group_ids = Column('groups', MutableList.as_mutable(postgresql.ARRAY(db.Integer)), nullable=True) api_key = Column(db.String(40), default=lambda: generate_token(40), @@ -445,4 +380,4 @@ def permissions(self): return ['view_query'] def has_access(self, obj, access_type): - return False \ No newline at end of file + return False diff --git a/redash/tasks/general.py b/redash/tasks/general.py index 8be76d2754..f23bebb706 100644 --- a/redash/tasks/general.py +++ b/redash/tasks/general.py @@ -3,6 +3,7 @@ from celery.utils.log import get_task_logger from flask_mail import Message from redash import mail, models, settings +from redash.models import users from redash.version_check import run_version_check from redash.worker import celery @@ -69,4 +70,4 @@ def send_mail(to, subject, html, text): expires=45, ) def sync_user_details(): - models.UserDetail.sync() + users.sync_last_active_at() diff --git a/requirements.txt b/requirements.txt index 191497de0c..0db00958d7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,7 +23,6 @@ python-dateutil==2.7.3 pytz==2016.7 PyYAML==3.12 redis==3.0.1 -walrus==0.7.1 requests==2.21.0 six==1.11.0 SQLAlchemy==1.2.12 diff --git a/tests/models/test_users.py b/tests/models/test_users.py index 7b28ad431a..7a51318ca2 100644 --- a/tests/models/test_users.py +++ b/tests/models/test_users.py @@ -1,7 +1,10 @@ # -*- coding: utf-8 -*- -from tests import BaseTestCase +from tests import BaseTestCase, authenticated_user +from redash import redis_connection from redash.models import User, db +from redash.utils import dt_from_timestamp +from redash.models.users import sync_last_active_at, update_user_active_at, LAST_ACTIVE_KEY class TestUserUpdateGroupAssignments(BaseTestCase): def test_default_group_always_added(self): @@ -57,3 +60,38 @@ def test_non_unicode_search_string(self): user = self.factory.create_user(name=u'אריק') assert user in User.search(User.all(user.org), term=u'א') + + +class TestUserDetail(BaseTestCase): + # def setUp(self): + # super(TestUserDetail, self).setUp() + # # redis_connection.flushdb() + + def test_userdetail_db_default(self): + with authenticated_user(self.client) as user: + self.assertEqual(user.details, {}) + self.assertIsNone(user.active_at) + + def test_userdetail_db_default_save(self): + with authenticated_user(self.client) as user: + user.details['test'] = 1 + db.session.commit() + + user_reloaded = User.query.filter_by(id=user.id).first() + self.assertEqual(user.details['test'], 1) + self.assertEqual( + user_reloaded, + User.query.filter( + User.details['test'].astext.cast(db.Integer) == 1 + ).first() + ) + + def test_sync(self): + with authenticated_user(self.client) as user: + rv = self.client.get('/default/') + timestamp = dt_from_timestamp(redis_connection.hget(LAST_ACTIVE_KEY, user.id)) + sync_last_active_at() + + user_reloaded = User.query.filter(User.id==user.id).first() + self.assertIn('active_at', user_reloaded.details) + self.assertEqual(user_reloaded.active_at, timestamp) \ No newline at end of file diff --git a/tests/test_models.py b/tests/test_models.py index 4c83cb4021..98a960e803 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -4,9 +4,8 @@ from unittest import TestCase import pytz -import walrus from dateutil.parser import parse as date_parse -from tests import BaseTestCase, authenticated_user +from tests import BaseTestCase from redash import models, redis_connection from redash.models import db, types @@ -594,85 +593,3 @@ def test_returns_dashboards_from_current_org_only(self): self.assertIn(w1.dashboard, models.Dashboard.all(self.u1.org, self.u1.group_ids, None)) self.assertNotIn(w1.dashboard, models.Dashboard.all(user.org, user.group_ids, None)) - - -class Timestamp(walrus.Model): - __database__ = redis_connection - __namespace__ = 'redash.tests.timestamp' - - created_at = types.UTCDateTimeField(index=True, default=utcnow) - - -class TestUserDetail(BaseTestCase): - - def setUp(self): - super(TestUserDetail, self).setUp() - redis_connection.flushdb() - - def test_walrus_utcdatetimefield(self): - timestamp = Timestamp() - timestamp.save() - - timestamps = list(Timestamp.all()) - self.assertEqual(len(timestamps), 1) - self.assertIsInstance(timestamps[0].created_at, datetime.datetime) - self.assertEqual(timestamps[0].created_at.tzinfo, pytz.utc) - - def test_userdetail_db_default(self): - with authenticated_user(self.client) as user: - self.assertEqual(user.details, {}) - self.assertIsNone(user.active_at) - - def test_userdetail_db_default_save(self): - with authenticated_user(self.client) as user: - user.details['test'] = 1 - models.db.session.commit() - - user_reloaded = models.User.query.filter_by(id=user.id).first() - self.assertEqual(user.details['test'], 1) - self.assertEqual( - user_reloaded, - models.User.query.filter( - models.User.details['test'].astext.cast(models.db.Integer) == 1 - ).first() - ) - - def test_userdetail_create(self): - self.assertEqual(len(list(models.UserDetail.all())), 0) - user_detail = models.UserDetail.create(user_id=1) - user_detail.save() - self.assertEqual( - models.UserDetail.get(models.UserDetail.user_id == 1)._id, - user_detail._id, - ) - - def test_userdetail_update(self): - self.assertEqual(len(list(models.UserDetail.all())), 0) - # first try to create a user with a user id that we haven't used before - # and see if the creation was successful - models.UserDetail.update(user_id=1000) # non-existent user - all_user_details = list(models.UserDetail.all()) - self.assertEqual(len(all_user_details), 1) - created_user_detail = all_user_details[0] - - # then see if we can update the same user detail again - updated_user_detail = models.UserDetail.update( - user_id=created_user_detail.user_id - ) - self.assertGreater( - updated_user_detail.updated_at, - created_user_detail.updated_at - ) - - def test_sync(self): - with authenticated_user(self.client) as user: - user_detail = models.UserDetail.update(user_id=user.id) - self.assertEqual(user.details, {}) - - self.assertEqual(len(list(models.UserDetail.all())), 1) - models.UserDetail.sync() - self.assertEqual(len(list(models.UserDetail.all())), 0) - - user_reloaded = models.User.query.filter_by(id=user.id).first() - self.assertIn('active_at', user_reloaded.details) - self.assertEqual(user_reloaded.active_at, user_detail.updated_at)