Skip to content

Commit

Permalink
Redis based implementation of user active_at timestamp update (#3256)
Browse files Browse the repository at this point in the history
* Switch to simpler implementation
* Fix active_at update code
* Fix sync test
  • Loading branch information
arikfr authored Jan 8, 2019
1 parent 22f835d commit 08953cc
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 197 deletions.
6 changes: 3 additions & 3 deletions redash/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import urlparse
import urllib

import walrus
import redis
from flask import Flask, current_app
from flask_sslify import SSLify
from werkzeug.contrib.fixers import ProxyFix
Expand Down Expand Up @@ -48,15 +48,15 @@ def create_redis_connection():
else:
db = 0

client = walrus.Database(unix_socket_path=redis_url.path, db=db)
client = redis.StrictRedis(unix_socket_path=redis_url.path, db=db)
else:
if redis_url.path:
redis_db = redis_url.path[1]
else:
redis_db = 0
# Redis passwords might be quoted with special characters
redis_password = redis_url.password and urllib.unquote(redis_url.password)
client = walrus.Database(host=redis_url.hostname, port=redis_url.port, db=redis_db, password=redis_password)
client = redis.StrictRedis(host=redis_url.hostname, port=redis_url.port, db=redis_db, password=redis_password)

return client

Expand Down
3 changes: 1 addition & 2 deletions redash/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
from .mixins import BelongsToOrgMixin, TimestampMixin
from .organizations import Organization
from .types import Configuration, MutableDict, MutableList, PseudoJSON
from .users import (AccessPermission, AnonymousUser, ApiUser, Group, User,
UserDetail) # noqa
from .users import (AccessPermission, AnonymousUser, ApiUser, Group, User) # noqa

logger = logging.getLogger(__name__)

Expand Down
1 change: 0 additions & 1 deletion redash/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class GFKBase(object):
"""
Compatibility with 'generic foreign key' approach Peewee used.
"""
# XXX Replace this with table-per-association.
object_type = Column(db.String(255))
object_id = Column(db.Integer)

Expand Down
13 changes: 0 additions & 13 deletions redash/models/types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import pytz
import walrus
from sqlalchemy.types import TypeDecorator
from sqlalchemy.ext.indexable import index_property
from sqlalchemy.ext.mutable import Mutable
Expand Down Expand Up @@ -92,15 +91,3 @@ def __init__(self, cast_type, *args, **kwargs):
def expr(self, model):
expr = super(json_cast_property, self).expr(model)
return expr.astext.cast(self.cast_type)


class UTCDateTimeField(walrus.DateTimeField):
"""
A walrus DateTimeField that makes the value timezone aware
using the pytz.utc timezone on return.
"""
def python_value(self, value):
value = super(UTCDateTimeField, self).python_value(value)
if value.tzinfo is None:
value = value.replace(tzinfo=pytz.utc)
return value
117 changes: 26 additions & 91 deletions redash/models/users.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import hashlib
import itertools
import logging
import time
from functools import reduce
from operator import or_

import walrus
from flask import current_app as app, url_for, request_started
from flask_login import current_user, AnonymousUserMixin, UserMixin
from passlib.apps import custom_app_context as pwd_context
Expand All @@ -16,115 +16,51 @@
from sqlalchemy_utils.models import generic_repr

from redash import redis_connection
from redash.utils import generate_token, utcnow
from redash.utils import generate_token, utcnow, dt_from_timestamp

from .base import db, Column, GFKBase
from .mixins import TimestampMixin, BelongsToOrgMixin
from .types import json_cast_property, MutableDict, MutableList, UTCDateTimeField
from .types import json_cast_property, MutableDict, MutableList

logger = logging.getLogger(__name__)


class UserDetail(walrus.Model):
"""
A walrus data model to store some user data to Redis to be
synced to Postgres asynchronously.
"""
__database__ = redis_connection
__namespace__ = 'redash.user.details'
LAST_ACTIVE_KEY = 'users:last_active_at'

user_id = walrus.IntegerField(index=True)
updated_at = UTCDateTimeField(index=True, default=utcnow)

@classmethod
def update(cls, user_id):
"""
Update the user details hash using the given redis
pipeline, user id, optional redis id and optional user
details.
The fields uid, rid and updated (timestamp) are
enforced and can't be overwritten.
"""
# try getting the user detail with the given user ID
# or create one if it doesn't exist yet (e.g. when key was purged)
try:
user_detail = cls.get(cls.user_id == user_id)
# update the timestamp with the current time
user_detail.updated_at = utcnow()
# save to Redis
user_detail.save()
except ValueError:
user_detail = cls.create(
user_id=user_id,
updated_at=utcnow(),
)
return user_detail

@classmethod
def sync(cls, chunksize=1000):
"""
Syncs user details to Postgres (to the JSON field User.details).
"""
to_sync = {}
try:
for user_detail in cls.all():
to_sync[user_detail.user_id] = user_detail

user_ids = list(to_sync.keys())
if not user_ids:
return
logger.info(
'syncing users: %s',
', '.join([str(uid) for uid in user_ids])
)
# get all SQLA users that need to be updated
users = User.query.filter(User.id.in_(user_ids))
for i, user in enumerate(users):
update = to_sync[user.id]
user.active_at = update.updated_at
# flush changes to the database after a certain
# number of items and extend the list of keys to
# stop sync in case of exceptions
if i % chunksize == 0:
db.session.flush()
db.session.commit()
except DBAPIError:
# reset list of keys to stop sync
pass
finally:
user_ids = [str(user_id) for user_id in to_sync.keys()]
if user_ids:
logger.info(
'Deleting temporary user details for users %s',
', '.join(user_ids)
)
delete_query = [
UserDetail.user_id == str(user_id)
for user_id in user_ids
]
UserDetail.query_delete(reduce(or_, delete_query))


def update_user_detail(sender, *args, **kwargs):
def sync_last_active_at():
"""
Update User model with the active_at timestamp from Redis. We first fetch
all the user_ids to update, and then fetch the timestamp to minimize the
time between fetching the value and updating the DB. This is because there
might be a more recent update we skip otherwise.
"""
user_ids = redis_connection.hkeys(LAST_ACTIVE_KEY)
for user_id in user_ids:
timestamp = redis_connection.hget(LAST_ACTIVE_KEY, user_id)
active_at = dt_from_timestamp(timestamp)
user = User.query.filter(User.id == user_id).first()
if user:
user.active_at = active_at
redis_connection.hdel(LAST_ACTIVE_KEY, user_id)
db.session.commit()


def update_user_active_at(sender, *args, **kwargs):
"""
Used as a Flask request_started signal callback that adds
the current user's details to Redis
"""
if (
current_user.get_id() and
current_user.is_authenticated and
not current_user.is_api_user()
):
UserDetail.update(current_user.id)
if current_user.is_authenticated and not current_user.is_api_user():
redis_connection.hset(LAST_ACTIVE_KEY, current_user.id, int(time.time()))


def init_app(app):
"""
A Flask extension to keep user details updates in Redis and
sync it periodically to the database (User.details).
"""
request_started.connect(update_user_detail, app)
request_started.connect(update_user_active_at, app)


class PermissionsCheckMixin(object):
Expand All @@ -150,7 +86,6 @@ class User(TimestampMixin, db.Model, BelongsToOrgMixin, UserMixin, PermissionsCh
email = Column(EmailType)
_profile_image_url = Column('profile_image_url', db.String(320), nullable=True)
password_hash = Column(db.String(128), nullable=True)
# XXX replace with association table
group_ids = Column('groups', MutableList.as_mutable(postgresql.ARRAY(db.Integer)), nullable=True)
api_key = Column(db.String(40),
default=lambda: generate_token(40),
Expand Down
3 changes: 2 additions & 1 deletion redash/tasks/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from celery.utils.log import get_task_logger
from flask_mail import Message
from redash import mail, models, settings
from redash.models import users
from redash.version_check import run_version_check
from redash.worker import celery

Expand Down Expand Up @@ -69,4 +70,4 @@ def send_mail(to, subject, html, text):
expires=45,
)
def sync_user_details():
models.UserDetail.sync()
users.sync_last_active_at()
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ python-dateutil==2.7.3
pytz==2016.7
PyYAML==3.12
redis==3.0.1
walrus==0.7.1
requests==2.21.0
six==1.11.0
SQLAlchemy==1.2.12
Expand Down
40 changes: 39 additions & 1 deletion tests/models/test_users.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# -*- coding: utf-8 -*-
from tests import BaseTestCase
from tests import BaseTestCase, authenticated_user

from redash import redis_connection
from redash.models import User, db
from redash.utils import dt_from_timestamp
from redash.models.users import sync_last_active_at, update_user_active_at, LAST_ACTIVE_KEY

class TestUserUpdateGroupAssignments(BaseTestCase):
def test_default_group_always_added(self):
Expand Down Expand Up @@ -57,3 +60,38 @@ def test_non_unicode_search_string(self):
user = self.factory.create_user(name=u'אריק')

assert user in User.search(User.all(user.org), term=u'א')


class TestUserDetail(BaseTestCase):
# def setUp(self):
# super(TestUserDetail, self).setUp()
# # redis_connection.flushdb()

def test_userdetail_db_default(self):
with authenticated_user(self.client) as user:
self.assertEqual(user.details, {})
self.assertIsNone(user.active_at)

def test_userdetail_db_default_save(self):
with authenticated_user(self.client) as user:
user.details['test'] = 1
db.session.commit()

user_reloaded = User.query.filter_by(id=user.id).first()
self.assertEqual(user.details['test'], 1)
self.assertEqual(
user_reloaded,
User.query.filter(
User.details['test'].astext.cast(db.Integer) == 1
).first()
)

def test_sync(self):
with authenticated_user(self.client) as user:
rv = self.client.get('/default/')
timestamp = dt_from_timestamp(redis_connection.hget(LAST_ACTIVE_KEY, user.id))
sync_last_active_at()

user_reloaded = User.query.filter(User.id==user.id).first()
self.assertIn('active_at', user_reloaded.details)
self.assertEqual(user_reloaded.active_at, timestamp)
85 changes: 1 addition & 84 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
from unittest import TestCase

import pytz
import walrus
from dateutil.parser import parse as date_parse
from tests import BaseTestCase, authenticated_user
from tests import BaseTestCase

from redash import models, redis_connection
from redash.models import db, types
Expand Down Expand Up @@ -594,85 +593,3 @@ def test_returns_dashboards_from_current_org_only(self):

self.assertIn(w1.dashboard, models.Dashboard.all(self.u1.org, self.u1.group_ids, None))
self.assertNotIn(w1.dashboard, models.Dashboard.all(user.org, user.group_ids, None))


class Timestamp(walrus.Model):
__database__ = redis_connection
__namespace__ = 'redash.tests.timestamp'

created_at = types.UTCDateTimeField(index=True, default=utcnow)


class TestUserDetail(BaseTestCase):

def setUp(self):
super(TestUserDetail, self).setUp()
redis_connection.flushdb()

def test_walrus_utcdatetimefield(self):
timestamp = Timestamp()
timestamp.save()

timestamps = list(Timestamp.all())
self.assertEqual(len(timestamps), 1)
self.assertIsInstance(timestamps[0].created_at, datetime.datetime)
self.assertEqual(timestamps[0].created_at.tzinfo, pytz.utc)

def test_userdetail_db_default(self):
with authenticated_user(self.client) as user:
self.assertEqual(user.details, {})
self.assertIsNone(user.active_at)

def test_userdetail_db_default_save(self):
with authenticated_user(self.client) as user:
user.details['test'] = 1
models.db.session.commit()

user_reloaded = models.User.query.filter_by(id=user.id).first()
self.assertEqual(user.details['test'], 1)
self.assertEqual(
user_reloaded,
models.User.query.filter(
models.User.details['test'].astext.cast(models.db.Integer) == 1
).first()
)

def test_userdetail_create(self):
self.assertEqual(len(list(models.UserDetail.all())), 0)
user_detail = models.UserDetail.create(user_id=1)
user_detail.save()
self.assertEqual(
models.UserDetail.get(models.UserDetail.user_id == 1)._id,
user_detail._id,
)

def test_userdetail_update(self):
self.assertEqual(len(list(models.UserDetail.all())), 0)
# first try to create a user with a user id that we haven't used before
# and see if the creation was successful
models.UserDetail.update(user_id=1000) # non-existent user
all_user_details = list(models.UserDetail.all())
self.assertEqual(len(all_user_details), 1)
created_user_detail = all_user_details[0]

# then see if we can update the same user detail again
updated_user_detail = models.UserDetail.update(
user_id=created_user_detail.user_id
)
self.assertGreater(
updated_user_detail.updated_at,
created_user_detail.updated_at
)

def test_sync(self):
with authenticated_user(self.client) as user:
user_detail = models.UserDetail.update(user_id=user.id)
self.assertEqual(user.details, {})

self.assertEqual(len(list(models.UserDetail.all())), 1)
models.UserDetail.sync()
self.assertEqual(len(list(models.UserDetail.all())), 0)

user_reloaded = models.User.query.filter_by(id=user.id).first()
self.assertIn('active_at', user_reloaded.details)
self.assertEqual(user_reloaded.active_at, user_detail.updated_at)

0 comments on commit 08953cc

Please sign in to comment.