Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis based implementation of user active_at timestamp update #3256

Merged
merged 3 commits into from
Jan 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions redash/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import urlparse
import urllib

import walrus
import redis
from flask import Flask, current_app
from flask_sslify import SSLify
from werkzeug.contrib.fixers import ProxyFix
Expand Down Expand Up @@ -48,15 +48,15 @@ def create_redis_connection():
else:
db = 0

client = walrus.Database(unix_socket_path=redis_url.path, db=db)
client = redis.StrictRedis(unix_socket_path=redis_url.path, db=db)
else:
if redis_url.path:
redis_db = redis_url.path[1]
else:
redis_db = 0
# Redis passwords might be quoted with special characters
redis_password = redis_url.password and urllib.unquote(redis_url.password)
client = walrus.Database(host=redis_url.hostname, port=redis_url.port, db=redis_db, password=redis_password)
client = redis.StrictRedis(host=redis_url.hostname, port=redis_url.port, db=redis_db, password=redis_password)

return client

Expand Down
3 changes: 1 addition & 2 deletions redash/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
from .mixins import BelongsToOrgMixin, TimestampMixin
from .organizations import Organization
from .types import Configuration, MutableDict, MutableList, PseudoJSON
from .users import (AccessPermission, AnonymousUser, ApiUser, Group, User,
UserDetail) # noqa
from .users import (AccessPermission, AnonymousUser, ApiUser, Group, User) # noqa

logger = logging.getLogger(__name__)

Expand Down
1 change: 0 additions & 1 deletion redash/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class GFKBase(object):
"""
Compatibility with 'generic foreign key' approach Peewee used.
"""
# XXX Replace this with table-per-association.
object_type = Column(db.String(255))
object_id = Column(db.Integer)

Expand Down
13 changes: 0 additions & 13 deletions redash/models/types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import pytz
import walrus
from sqlalchemy.types import TypeDecorator
from sqlalchemy.ext.indexable import index_property
from sqlalchemy.ext.mutable import Mutable
Expand Down Expand Up @@ -92,15 +91,3 @@ def __init__(self, cast_type, *args, **kwargs):
def expr(self, model):
expr = super(json_cast_property, self).expr(model)
return expr.astext.cast(self.cast_type)


class UTCDateTimeField(walrus.DateTimeField):
"""
A walrus DateTimeField that makes the value timezone aware
using the pytz.utc timezone on return.
"""
def python_value(self, value):
value = super(UTCDateTimeField, self).python_value(value)
if value.tzinfo is None:
value = value.replace(tzinfo=pytz.utc)
return value
119 changes: 27 additions & 92 deletions redash/models/users.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import hashlib
import itertools
import logging
import time
from functools import reduce
from operator import or_

import walrus
from flask import current_app as app, url_for, request_started
from flask_login import current_user, AnonymousUserMixin, UserMixin
from passlib.apps import custom_app_context as pwd_context
Expand All @@ -16,115 +16,51 @@
from sqlalchemy_utils.models import generic_repr

from redash import redis_connection
from redash.utils import generate_token, utcnow
from redash.utils import generate_token, utcnow, dt_from_timestamp

from .base import db, Column, GFKBase
from .mixins import TimestampMixin, BelongsToOrgMixin
from .types import json_cast_property, MutableDict, MutableList, UTCDateTimeField
from .types import json_cast_property, MutableDict, MutableList

logger = logging.getLogger(__name__)


class UserDetail(walrus.Model):
"""
A walrus data model to store some user data to Redis to be
synced to Postgres asynchronously.
"""
__database__ = redis_connection
__namespace__ = 'redash.user.details'
LAST_ACTIVE_KEY = 'users:last_active_at'

user_id = walrus.IntegerField(index=True)
updated_at = UTCDateTimeField(index=True, default=utcnow)

@classmethod
def update(cls, user_id):
"""
Update the user details hash using the given redis
pipeline, user id, optional redis id and optional user
details.

The fields uid, rid and updated (timestamp) are
enforced and can't be overwritten.
"""
# try getting the user detail with the given user ID
# or create one if it doesn't exist yet (e.g. when key was purged)
try:
user_detail = cls.get(cls.user_id == user_id)
# update the timestamp with the current time
user_detail.updated_at = utcnow()
# save to Redis
user_detail.save()
except ValueError:
user_detail = cls.create(
user_id=user_id,
updated_at=utcnow(),
)
return user_detail

@classmethod
def sync(cls, chunksize=1000):
"""
Syncs user details to Postgres (to the JSON field User.details).
"""
to_sync = {}
try:
for user_detail in cls.all():
to_sync[user_detail.user_id] = user_detail

user_ids = list(to_sync.keys())
if not user_ids:
return
logger.info(
'syncing users: %s',
', '.join([str(uid) for uid in user_ids])
)
# get all SQLA users that need to be updated
users = User.query.filter(User.id.in_(user_ids))
for i, user in enumerate(users):
update = to_sync[user.id]
user.active_at = update.updated_at
# flush changes to the database after a certain
# number of items and extend the list of keys to
# stop sync in case of exceptions
if i % chunksize == 0:
db.session.flush()
db.session.commit()
except DBAPIError:
# reset list of keys to stop sync
pass
finally:
user_ids = [str(user_id) for user_id in to_sync.keys()]
if user_ids:
logger.info(
'Deleting temporary user details for users %s',
', '.join(user_ids)
)
delete_query = [
UserDetail.user_id == str(user_id)
for user_id in user_ids
]
UserDetail.query_delete(reduce(or_, delete_query))


def update_user_detail(sender, *args, **kwargs):
def sync_last_active_at():
"""
Update User model with the active_at timestamp from Redis. We first fetch
all the user_ids to update, and then fetch the timestamp to minimize the
time between fetching the value and updating the DB. This is because there
might be a more recent update we skip otherwise.
"""
user_ids = redis_connection.hkeys(LAST_ACTIVE_KEY)
for user_id in user_ids:
timestamp = redis_connection.hget(LAST_ACTIVE_KEY, user_id)
active_at = dt_from_timestamp(timestamp)
user = User.query.filter(User.id == user_id).first()
if user:
user.active_at = active_at
redis_connection.hdel(LAST_ACTIVE_KEY, user_id)
db.session.commit()


def update_user_active_at(sender, *args, **kwargs):
"""
Used as a Flask request_started signal callback that adds
the current user's details to Redis
"""
if (
current_user.get_id() and
current_user.is_authenticated and
not current_user.is_api_user()
):
UserDetail.update(current_user.id)
if current_user.is_authenticated and not current_user.is_api_user():
redis_connection.hset(LAST_ACTIVE_KEY, current_user.id, int(time.time()))


def init_app(app):
"""
A Flask extension to keep user details updates in Redis and
sync it periodically to the database (User.details).
"""
request_started.connect(update_user_detail, app)
request_started.connect(update_user_active_at, app)


class PermissionsCheckMixin(object):
Expand All @@ -150,7 +86,6 @@ class User(TimestampMixin, db.Model, BelongsToOrgMixin, UserMixin, PermissionsCh
email = Column(EmailType)
_profile_image_url = Column('profile_image_url', db.String(320), nullable=True)
password_hash = Column(db.String(128), nullable=True)
# XXX replace with association table
group_ids = Column('groups', MutableList.as_mutable(postgresql.ARRAY(db.Integer)), nullable=True)
api_key = Column(db.String(40),
default=lambda: generate_token(40),
Expand Down Expand Up @@ -445,4 +380,4 @@ def permissions(self):
return ['view_query']

def has_access(self, obj, access_type):
return False
return False
3 changes: 2 additions & 1 deletion redash/tasks/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from celery.utils.log import get_task_logger
from flask_mail import Message
from redash import mail, models, settings
from redash.models import users
from redash.version_check import run_version_check
from redash.worker import celery

Expand Down Expand Up @@ -69,4 +70,4 @@ def send_mail(to, subject, html, text):
expires=45,
)
def sync_user_details():
models.UserDetail.sync()
users.sync_last_active_at()
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ python-dateutil==2.7.3
pytz==2016.7
PyYAML==3.12
redis==3.0.1
walrus==0.7.1
requests==2.21.0
six==1.11.0
SQLAlchemy==1.2.12
Expand Down
40 changes: 39 additions & 1 deletion tests/models/test_users.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# -*- coding: utf-8 -*-
from tests import BaseTestCase
from tests import BaseTestCase, authenticated_user

from redash import redis_connection
from redash.models import User, db
from redash.utils import dt_from_timestamp
from redash.models.users import sync_last_active_at, update_user_active_at, LAST_ACTIVE_KEY

class TestUserUpdateGroupAssignments(BaseTestCase):
def test_default_group_always_added(self):
Expand Down Expand Up @@ -57,3 +60,38 @@ def test_non_unicode_search_string(self):
user = self.factory.create_user(name=u'אריק')

assert user in User.search(User.all(user.org), term=u'א')


class TestUserDetail(BaseTestCase):
# def setUp(self):
# super(TestUserDetail, self).setUp()
# # redis_connection.flushdb()

def test_userdetail_db_default(self):
with authenticated_user(self.client) as user:
self.assertEqual(user.details, {})
self.assertIsNone(user.active_at)

def test_userdetail_db_default_save(self):
with authenticated_user(self.client) as user:
user.details['test'] = 1
db.session.commit()

user_reloaded = User.query.filter_by(id=user.id).first()
self.assertEqual(user.details['test'], 1)
self.assertEqual(
user_reloaded,
User.query.filter(
User.details['test'].astext.cast(db.Integer) == 1
).first()
)

def test_sync(self):
with authenticated_user(self.client) as user:
rv = self.client.get('/default/')
timestamp = dt_from_timestamp(redis_connection.hget(LAST_ACTIVE_KEY, user.id))
sync_last_active_at()

user_reloaded = User.query.filter(User.id==user.id).first()
self.assertIn('active_at', user_reloaded.details)
self.assertEqual(user_reloaded.active_at, timestamp)
Loading