Gergely Polonkai 4b1fff6544 Add the AppState model
This allows setting application state during run time
2018-07-25 20:27:13 +02:00

854 lines
25 KiB
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (C) 2018 Gergely Polonkai
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <>.
"""Database models for
from datetime import datetime
from enum import Enum
from warnings import warn
from flask import current_app
from flask_babelex import lazy_gettext
from flask_security import UserMixin, RoleMixin
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy_utils.types.choice import ChoiceType
from .app_state import app_state_base
from .cache import cache
from .utils import force_locale
db = SQLAlchemy()
users_roles = db.Table(
db.Column('user_id', db.Integer(), db.ForeignKey('')),
db.Column('role_id', db.Integer(), db.ForeignKey('')))
def generate_uuid():
"""Generate a UUID (version 4)
:returns: the hexadecimal representation of the generated UUID
:rtype: str
from uuid import uuid4
return uuid4().hex
def _(translate): # pylint: disable=invalid-name
"""Function to mark strings as translatable
The actual translation will be fetched later in `:meth:AuditLog.get_message`.
return translate
class NotificationAction(Enum):
"""The possible values for notification actions
#: A user followed another
follow = 1
#: A user has been invited to an event
invite = 2
NotificationAction.follow: (_('%(actor)s followed you'), _('%(actor)s followed %(item)s')),
NotificationAction.invite: (None, _('%(actor)s invited you to %(item)s')),
class ResponseType(Enum):
"""Enumeration of event availabilities
#: The user is about to attend the event
going = 0
#: The user will probably attend the event
probably_going = 1
#: The user is interested in the event, but might not go
probably_not_going = 2
#: The user wont attend the event
not_going = 3
def __hash__(self):
return Enum.__hash__(self)
def __eq__(self, other):
if isinstance(other, str):
return == other.lower() # pylint: disable=no-member
if isinstance(other, (int, float)):
return self.value == other
return Enum.__eq__(self, other)
class EventVisibility(Enum):
"""Enumeration for event visibility
#: The event is private, only attendees and people invited can see the details
private = 0
#: The event is public, anyone can see the details
public = 5
EventVisibility.private: _('Visible only to attendees'),
EventVisibility.public: _('Visible to everyone'),
class SettingsProxy:
"""Proxy object to get settings for a user
def __init__(self, user):
self.user = user
def __getitem__(self, key):
setting = UserSetting.query \
.filter(UserSetting.user == self.user) \
.filter(UserSetting.key == key) \
if setting is None:
return None
return setting.value
def __setitem__(self, key, value):
setting = UserSetting.query \
.filter(UserSetting.user == self.user) \
.filter(UserSetting.key == key) \
except NoResultFound:
setting = UserSetting(user=self.user, key=key)
setting.value = str(value)
def __repr__(self):
return f'<SettingsProxy for {self.user}>'
class User(db.Model, UserMixin):
"""Database model for users
__tablename__ = 'users'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The username of the user. This is also the display name and thus is immutable
username = db.Column(db.String(length=50), unique=True, nullable=False)
#: The email address of the user
email = db.Column(db.String(length=255), unique=True, nullable=True)
#: The (hashed) password of the user
password = db.Column(db.String(length=255))
#: A flag to show whether the user is enabled (active) or not
active = db.Column(db.Boolean(), default=False)
#: The timestamp when this user was created
created_at = db.Column(db.DateTime(), default=datetime.utcnow)
#: The timestamp when the user was activated
confirmed_at = db.Column(db.DateTime())
#: The roles of the user
roles = db.relationship('Role',
backref=db.backref('users', lazy='dynamic'))
def settings(self):
"""Get a settings proxy for the user
proxy = getattr(self, '_settings', None)
if proxy is None:
proxy = SettingsProxy(self)
setattr(self, '_settings', proxy)
return proxy
def timezone(self):
"""The users time zone
If the user didnt set a time zone yet, the application default is used.
from pytz import timezone
from pytz.exceptions import UnknownTimeZoneError
timezone_str = self.settings['timezone']
if timezone_str:
return timezone(timezone_str)
except UnknownTimeZoneError:
warn(f'Timezone of {self} (or the default timezone) "{timezone_str}" is invalid')
return current_app.timezone
def session_list_key(self):
"""The cache key of this users session list
return f'open_sessions:{}'
def active_sessions(self):
"""The list of active sessions of this user
return cache.get(self.session_list_key) or []
def active_sessions(self, value):
cache.set(self.session_list_key, list(value))
def __repr__(self):
return f'<User {}({self.username})>'
class Role(db.Model, RoleMixin): # pylint: disable=too-few-public-methods
"""Database model for roles
__tablename__ = 'roles'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The name of the role
name = db.Column(db.Unicode(length=80), unique=True)
#: A description of the role
description = db.Column(db.UnicodeText)
def __repr__(self):
return f'<Role {}({})>'
class Profile(db.Model): # pylint: disable=too-few-public-methods
"""Database model for user profiles
__tablename__ = 'profiles'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The ID of the local user this profile belongs to
user_id = db.Column(db.Integer(), db.ForeignKey(''), nullable=True, unique=True)
user = db.relationship(User, backref=db.backref('profile', uselist=False))
#: The username this profile belongs to. If ``None``, `user_id` must be set
username = db.Column(db.String(length=50), nullable=True)
#: The domain this profile originates from. If ``None``, `user_id` must be set
domain = db.Column(db.Unicode(length=100), nullable=True)
#: The display name
display_name = db.Column(db.Unicode(length=80), nullable=False)
#: If locked, a profile cannot be followed without the owners consent
locked = db.Column(db.Boolean(), default=False)
#: If set, the profile will display this builtin avatar
builtin_avatar = db.Column(db.String(length=40), nullable=True)
def fqn(self):
"""The fully qualified name of the profile
For local profiles, this is in the form ``@username``; for remote users, its in the form
if self.user:
return f'@{self.user.username}'
return f'@{self.username}@{self.domain}'
def __repr__(self):
if self.user:
username = self.user.username
domain = ''
username = self.username
domain = f'@{self.domain}'
return f'<Profile {}(@{username}{domain})>'
def __str__(self):
ret = ''
if self.display_name:
ret = self.display_name + ' '
ret += f'({self.fqn})'
return ret
def followed_list(self):
"""List of profiles this profile is following
# This will always be empty for remote profiles
if not self.user:
return []
return Profile.query \
.join(UserFollow.followed) \
.filter(UserFollow.follower == self) \
def follower_list(self):
"""List of profiles that follow this profile
# This will always be empty for remote profiles
if not self.user:
return []
return Profile.query \
.join(UserFollow.follower) \
.filter(UserFollow.followed == self) \
def url(self):
"""Get the URL for this profile
from flask import url_for
if self.user:
return url_for('display_profile', username=self.user.username)
return NotImplemented
def follow(self, follower):
"""Make ``follower`` follow this profile
if not isinstance(follower, Profile):
raise TypeError('Folloer must be a Profile object')
timestamp = None if self.locked else datetime.utcnow()
user_follow = UserFollow(follower=follower, followed=self, accepted_at=timestamp)
notification = self.notify(follower, self, NotificationAction.follow)
return user_follow
def notify(self, actor, item, action):
"""Notify this profile about ``action`` on ``item`` by ``actor``
:param actor: the actor who generated the notification
:type actor: Profile
:param item: the item ``action`` was performed on
:type item: any
:param action: the type of the action
:type action: NotificationAction, str
:raises TypeError: if ``actor`` is not a `Profile` object
:returns: the generated notification. It is already added to the database session, but
not committed
:rtype: Notification
if not isinstance(actor, Profile):
raise TypeError('actor must be a Profile instance')
if isinstance(action, str):
action = NotificationAction[action]
notification = Notification(profile=self, actor=actor, item=item, action=action)
return notification
class Event(db.Model):
"""Database model for events
__tablename__ = 'events'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The UUID of the event. This is what is presented to the users and used in federation.
event_uuid = db.Column(db.String(length=40), unique=True, nullable=False, default=generate_uuid)
#: The ID of the profile who created the event
profile_id = db.Column(db.Integer(), db.ForeignKey(''), nullable=False)
profile = db.relationship('Profile', backref=db.backref('events', lazy='dynamic'))
#: The title of the event
title = db.Column(db.Unicode(length=200), nullable=False)
#: The time zone to be used for `start_time` and `end_time`
time_zone = db.Column(db.String(length=80), nullable=False)
#: The starting timestamp of the event. It is in the UTC time zone
start_time = db.Column(db.DateTime(), nullable=False)
#: The ending timestamp of the event. It is in the UTC time zone
end_time = db.Column(db.DateTime(), nullable=False)
#: If `True`, the event is a whole-day event
all_day = db.Column(db.Boolean(), default=False)
#: The description of the event
description = db.Column(db.UnicodeText())
#: The visibility of the event
visibility = db.Column(db.Enum(EventVisibility), nullable=False)
def __as_tz(self, timestamp, as_timezone=None):
from pytz import timezone, utc
utc_timestamp = utc.localize(dt=timestamp)
return utc_timestamp.astimezone(as_timezone or timezone(self.time_zone))
def start_time_tz(self):
"""The same timestamp as `start_time`, but in the time zone specified by `time_zone`.
return self.__as_tz(self.start_time)
def end_time_tz(self):
"""The same timestamp as `end_time`, but in the time zone specified by `time_zone`.
return self.__as_tz(self.end_time)
def start_time_for_user(self, user):
"""The same timestamp as `start_time`, but in the time zone of `user`
return self.__as_tz(self.start_time, as_timezone=user.timezone)
def end_time_for_user(self, user):
"""The same timestamp as `end_time`, but in the time zone of `user`
return self.__as_tz(self.end_time, as_timezone=user.timezone)
def __repr__(self):
return f'<Event {} ({self.title}) of {self.profile}>'
def __str__(self):
return self.title
def url(self):
"""The URL of the event
from flask import url_for
return url_for('event_details', event_uuid=self.event_uuid)
def invite(self, inviter, invited):
"""Invite ``invited`` to the event
The invitation will arrive from ``inviter``.
invite = Invitation(event=self, sender=inviter, invitee=invited)
notification = invited.notify(inviter, self, NotificationAction.invite)
return invite
class UserSetting(db.Model): # pylint: disable=too-few-public-methods
"""Database model for user settings
__tablename__ = 'user_settings'
#: The ID of the user this setting belongs to
user_id = db.Column(db.Integer(), db.ForeignKey(''), primary_key=True)
user = db.relationship('User')
#: The settings key
key = db.Column(db.String(length=40), primary_key=True)
#: The settings value
value = db.Column(db.UnicodeText())
def __repr__(self):
return f'<UserSetting of {self.user}, {self.key}="{self.value}">'
class AuditLog(db.Model):
"""Database model for audit log records
__tablename__ = 'audit_log'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
TYPE_LOGIN_FAIL = 'failed_login'
TYPE_LOGOUT = 'logout'
(TYPE_LOGIN_SUCCESS, _('%(user)s logged in')),
(TYPE_LOGIN_FAIL, _('%(user)s failed to log in')),
(TYPE_LOGOUT, _('%(user)s logged out')),
user_id = db.Column(db.Integer(), db.ForeignKey(''))
user = db.relationship('User')
ip_address = db.Column(db.String(length=40), nullable=False)
timestamp = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
log_type = db.Column(ChoiceType(TYPES), nullable=False, index=True)
extra_data = db.Column(db.UnicodeText())
def __str__(self):
format_string = dict(self.TYPES).get(self.log_type,
_('UNKNOWN RECORD "%(log_type)s" for %(user)s'))
message = str(self.timestamp) + \
(format_string % {
'user': self.user.username,
'log_type': self.log_type
if self.extra_data:
message += f' {self.extra_data}'
return message
def __repr__(self):
return f'<AuditLog {self.log_type} for {self.user} at {self.timestamp}>'
def get_message(cls, key, *args, **kwargs):
"""Get the translated message for ``key``
return lazy_gettext(dict(cls.TYPES).get(key), *args, **kwargs)
def log(cls, user, log_type, extra_data=None, logger=None):
"""Create a new audit log record
:param user: the user the new record corresponds to
:type user: User
:param log_type: the type of the record. Must be present in `TYPES`
:type log_type: str
:param extra_data: extra text to be added to the record
:type extra_data: str, None
:param logger: if set, logs will go to this logger instead of the default (calsocial)
:type logger: Logger
:raises TypeError: if ``user`` is not an instance of `User`
:raises ValueError: if ``log_type`` is not a valid log type
from logging import getLogger
from flask import has_request_context, request
logger = logger or getLogger('calsocial')
if not isinstance(user, User):
raise TypeError('user must be a User instance')
if log_type not in dict(cls.TYPES):
raise ValueError('log_type must be a valid log type')
if has_request_context():
ip = request.remote_addr or 'UNKNOWN'
record = cls(user=user,
with force_locale('en'):
message = f'Audit: [{ip}] [{}] ' + cls.get_message(log_type, user=user.username)
class UserFollow(db.Model): # pylint: disable=too-few-public-methods
"""Database model for user follows
#: The ID of the profile that is following the other
follower_id = db.Column(db.Integer(), db.ForeignKey(''), primary_key=True)
follower = db.relationship('Profile', foreign_keys=[follower_id])
#: The ID of the profile being followed
followed_id = db.Column(db.Integer(), db.ForeignKey(''), primary_key=True)
followed = db.relationship('Profile', foreign_keys=[followed_id])
#: The timestamp when the follow was initiated
initiated_at = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
#: The timestamp when the follow was accepted
accepted_at = db.Column(db.DateTime(), nullable=True)
def accept(self):
"""Accept this follow request
self.accepted_at = datetime.utcnow()
class Notification(db.Model):
"""Database model for notifications
__tablename__ = 'notifications'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The recipient of the notification
profile_id = db.Column(db.Integer(), db.ForeignKey(''), nullable=False)
profile = db.relationship('Profile',
backref=db.backref('notifications', lazy='dynamic'),
#: The profile that generated the notification
actor_id = db.Column(db.Integer(), db.ForeignKey(''), nullable=True)
actor = db.relationship('Profile', foreign_keys=[actor_id])
#: The item (e.g. event) that generated the notification
item_id = db.Column(db.Integer(), nullable=True)
#: The type of the item that generated the notification
item_type = db.Column(db.String(length=40))
#: The type of action
action = db.Column(db.Enum(NotificationAction))
#: The timestamp when the notification was created
created_at = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
#: The timestamp when the notification was read
read_at = db.Column(db.DateTime(), nullable=True)
def item(self):
"""The subject of the notification
item_class = self._decl_class_registry.get(self.item_type)
if item_class is None:
warn(f'Unknown item type {self.item_type}')
return None
return item_class.query.get(self.item_id)
def item(self, value):
self.item_type = value.__class__.__name__
self.item_id =
def message(self):
"""Get the translated message for ``key``
from flask_security import current_user
messages = NOTIFICATION_ACTION_MESSAGES.get(self.action)
message = messages[0 if self.item == current_user.profile else 1]
return lazy_gettext(message,, item=self.item)
def html(self):
"""Get the translated message for ``key`` in HTML format
from flask import Markup
from flask_security import current_user
messages = NOTIFICATION_ACTION_MESSAGES.get(self.action)
message = messages[0 if self.item == current_user.profile else 1]
actor = f'<a href="{}">{}</a>'
item = f'<a href="{self.item.url}">{self.item}</a>'
return Markup(lazy_gettext(message, actor=actor, item=item))
class Invitation(db.Model): # pylint: disable=too-few-public-methods
"""Database model for event invitations
__tablename__ = 'invitations'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The ID of the senders profile
sender_id = db.Column(db.Integer(), db.ForeignKey(''), index=True)
sender = db.relationship('Profile', foreign_keys=[sender_id])
#: The ID of the invitees profile
invitee_id = db.Column(db.Integer(), db.ForeignKey(''), index=True)
invitee = db.relationship('Profile', foreign_keys=[invitee_id])
#: The ID of the event
event_id = db.Column(db.Integer(), db.ForeignKey(''), index=True)
event = db.relationship('Event', backref=db.backref('invitations', lazy='dynamic'))
#: The timestamp of the invitation
timestamp = db.Column(db.DateTime(), default=datetime.utcnow)
class Response(db.Model): # pylint: disable=too-few-public-methods
"""Database model for RSVPs.
__tablename__ = 'responses'
#: The profile thats sending the RSVP
profile_id = db.Column(db.Integer(), db.ForeignKey(''), primary_key=True)
profile = db.relationship('Profile')
#: The ID of the event
event_id = db.Column(db.Integer(), db.ForeignKey(''), primary_key=True)
event = db.relationship('Event')
#: The ID of the invitation, if the user is responding to an invitation
invitation_id = db.Column(db.Integer(), db.ForeignKey(''), nullable=True)
invitation = db.relationship('Invitation')
#: The timestamp of the response
timestamp = db.Column(db.DateTime(), default=datetime.utcnow)
#: The response itself
response = db.Column(db.Enum(ResponseType), nullable=False)
class AppState(app_state_base(db.Model)): # pylint: disable=too-few-public-methods
"""Database model for application state values
__tablename__ = 'app_state'
#: The environment that set this key
env = db.Column(db.String(length=40), nullable=False, primary_key=True)
#: The key
key = db.Column(db.String(length=80), nullable=False, primary_key=True)
#: The value of the key
value = db.Column(db.Unicode(length=200), nullable=True)
def __get_state__(cls, key):
record = cls.query \
.filter(cls.env == current_app.env) \
.filter(cls.key == key) \
except NoResultFound:
return None
return record.value
def __set_state__(cls, key, value):
record = cls.query \
.filter(cls.env == current_app.env) \
.filter(cls.key == key) \
except NoResultFound:
record = cls(env=current_app.env, key=key)
record.value = value
def __set_state_default__(cls, key, value):
record = cls.query \
.filter(cls.env == current_app.env) \
.filter(cls.key == key) \
except NoResultFound:
record = cls(env=current_app.env, key=key, value=value)
def __repr__(self):
return f'<AppState {self.env}:{self.key}="{self.value}"'