calendar-social/calsocial/models.py

854 lines
25 KiB
Python
Raw Permalink Normal View History

# Calendar.social
# Copyright (C) 2018 Gergely Polonkai
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Database models for Calendar.social
"""
2018-06-28 12:41:14 +00:00
from datetime import datetime
from enum import Enum
from warnings import warn
2018-06-28 12:41:14 +00:00
from flask import current_app
2018-07-04 04:30:59 +00:00
from flask_babelex import lazy_gettext
2018-06-28 12:41:14 +00:00
from flask_security import UserMixin, RoleMixin
from flask_sqlalchemy import SQLAlchemy
2018-07-03 08:51:43 +00:00
from sqlalchemy.orm.exc import NoResultFound
2018-07-04 04:30:59 +00:00
from sqlalchemy_utils.types.choice import ChoiceType
from .app_state import app_state_base
from .cache import cache
2018-07-04 04:30:59 +00:00
from .utils import force_locale
2018-06-28 12:41:14 +00:00
db = SQLAlchemy()
users_roles = db.Table(
'users_roles',
db.Column('user_id', db.Integer(), db.ForeignKey('users.id')),
db.Column('role_id', db.Integer(), db.ForeignKey('roles.id')))
def generate_uuid():
"""Generate a UUID (version 4)
:returns: the hexadecimal representation of the generated UUID
:rtype: str
"""
from uuid import uuid4
return uuid4().hex
def _(translate): # pylint: disable=invalid-name
"""Function to mark strings as translatable
The actual translation will be fetched later in `:meth:AuditLog.get_message`.
"""
return translate
class NotificationAction(Enum):
"""The possible values for notification actions
"""
#: A user followed another
follow = 1
#: A user has been invited to an event
invite = 2
NOTIFICATION_ACTION_MESSAGES = {
NotificationAction.follow: (_('%(actor)s followed you'), _('%(actor)s followed %(item)s')),
NotificationAction.invite: (None, _('%(actor)s invited you to %(item)s')),
}
class ResponseType(Enum):
"""Enumeration of event availabilities
"""
#: The user is about to attend the event
going = 0
#: The user will probably attend the event
probably_going = 1
#: The user is interested in the event, but might not go
probably_not_going = 2
#: The user wont attend the event
not_going = 3
def __hash__(self):
return Enum.__hash__(self)
def __eq__(self, other):
if isinstance(other, str):
return self.name.lower() == other.lower() # pylint: disable=no-member
if isinstance(other, (int, float)):
2018-07-25 19:19:27 +00:00
return self.value == other # pylint: disable=comparison-with-callable
return Enum.__eq__(self, other)
class EventVisibility(Enum):
"""Enumeration for event visibility
"""
#: The event is private, only attendees and people invited can see the details
private = 0
#: The event is public, anyone can see the details
public = 5
EVENT_VISIBILITY_TRANSLATIONS = {
EventVisibility.private: _('Visible only to attendees'),
EventVisibility.public: _('Visible to everyone'),
}
2018-07-03 08:51:43 +00:00
class SettingsProxy:
"""Proxy object to get settings for a user
"""
def __init__(self, user):
self.user = user
def __getitem__(self, key):
setting = UserSetting.query \
.filter(UserSetting.user == self.user) \
.filter(UserSetting.key == key) \
.first()
if setting is None:
return None
return setting.value
def __setitem__(self, key, value):
try:
setting = UserSetting.query \
.filter(UserSetting.user == self.user) \
.filter(UserSetting.key == key) \
.one()
except NoResultFound:
setting = UserSetting(user=self.user, key=key)
setting.value = str(value)
db.session.add(setting)
def __repr__(self):
return f'<SettingsProxy for {self.user}>'
2018-06-28 12:41:14 +00:00
class User(db.Model, UserMixin):
"""Database model for users
"""
2018-06-28 12:41:14 +00:00
__tablename__ = 'users'
# pylint: disable=invalid-name
2018-06-28 12:41:14 +00:00
id = db.Column(db.Integer(), primary_key=True)
#: The username of the user. This is also the display name and thus is immutable
username = db.Column(db.String(length=50), unique=True, nullable=False)
#: The email address of the user
email = db.Column(db.String(length=255), unique=True, nullable=True)
#: The (hashed) password of the user
password = db.Column(db.String(length=255))
#: A flag to show whether the user is enabled (active) or not
active = db.Column(db.Boolean(), default=False)
#: The timestamp when this user was created
created_at = db.Column(db.DateTime(), default=datetime.utcnow)
#: The timestamp when the user was activated
confirmed_at = db.Column(db.DateTime())
#: The roles of the user
roles = db.relationship('Role',
secondary=users_roles,
backref=db.backref('users', lazy='dynamic'))
2018-07-03 08:51:43 +00:00
@property
def settings(self):
"""Get a settings proxy for the user
"""
proxy = getattr(self, '_settings', None)
if proxy is None:
proxy = SettingsProxy(self)
setattr(self, '_settings', proxy)
return proxy
@property
def timezone(self):
2018-07-08 17:47:05 +00:00
"""The users time zone
If the user didnt set a time zone yet, the application default is used.
"""
from pytz import timezone
from pytz.exceptions import UnknownTimeZoneError
timezone_str = self.settings['timezone']
if timezone_str:
try:
return timezone(timezone_str)
except UnknownTimeZoneError:
warn(f'Timezone of {self} (or the default timezone) "{timezone_str}" is invalid')
return current_app.timezone
@property
def session_list_key(self):
"""The cache key of this users session list
"""
return f'open_sessions:{self.id}'
@property
def active_sessions(self):
"""The list of active sessions of this user
"""
return cache.get(self.session_list_key) or []
@active_sessions.setter
def active_sessions(self, value):
cache.set(self.session_list_key, list(value))
def __repr__(self):
2018-06-28 12:41:14 +00:00
return f'<User {self.id}({self.username})>'
class Role(db.Model, RoleMixin): # pylint: disable=too-few-public-methods
"""Database model for roles
"""
2018-06-28 12:41:14 +00:00
__tablename__ = 'roles'
# pylint: disable=invalid-name
2018-06-28 12:41:14 +00:00
id = db.Column(db.Integer(), primary_key=True)
#: The name of the role
name = db.Column(db.Unicode(length=80), unique=True)
#: A description of the role
description = db.Column(db.UnicodeText)
def __repr__(self):
return f'<Role {self.id}({self.name})>'
2018-06-29 11:25:09 +00:00
class Profile(db.Model): # pylint: disable=too-few-public-methods
"""Database model for user profiles
"""
__tablename__ = 'profiles'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The ID of the local user this profile belongs to
user_id = db.Column(db.Integer(), db.ForeignKey('users.id'), nullable=True, unique=True)
user = db.relationship(User, backref=db.backref('profile', uselist=False))
#: The username this profile belongs to. If ``None``, `user_id` must be set
username = db.Column(db.String(length=50), nullable=True)
#: The domain this profile originates from. If ``None``, `user_id` must be set
domain = db.Column(db.Unicode(length=100), nullable=True)
#: The display name
display_name = db.Column(db.Unicode(length=80), nullable=False)
#: If locked, a profile cannot be followed without the owners consent
locked = db.Column(db.Boolean(), default=False)
#: If set, the profile will display this builtin avatar
builtin_avatar = db.Column(db.String(length=40), nullable=True)
@property
def fqn(self):
"""The fully qualified name of the profile
For local profiles, this is in the form ``@username``; for remote users, its in the form
``@username@domain``.
"""
if self.user:
return f'@{self.user.username}'
return f'@{self.username}@{self.domain}'
def __repr__(self):
if self.user:
username = self.user.username
domain = ''
else:
username = self.username
domain = f'@{self.domain}'
return f'<Profile {self.id}(@{username}{domain})>'
def __str__(self):
ret = ''
if self.display_name:
ret = self.display_name + ' '
ret += f'({self.fqn})'
return ret
@property
def followed_list(self):
"""List of profiles this profile is following
"""
# This will always be empty for remote profiles
if not self.user:
return []
return Profile.query \
.join(UserFollow.followed) \
.filter(UserFollow.follower == self) \
.filter(UserFollow.accepted_at.isnot(None))
@property
def follower_list(self):
"""List of profiles that follow this profile
"""
# This will always be empty for remote profiles
if not self.user:
return []
return Profile.query \
.join(UserFollow.follower) \
.filter(UserFollow.followed == self) \
.filter(UserFollow.accepted_at.isnot(None))
@property
def url(self):
2018-07-10 14:39:16 +00:00
"""Get the URL for this profile
"""
from flask import url_for
if self.user:
return url_for('display_profile', username=self.user.username)
return NotImplemented
2018-07-11 14:40:42 +00:00
def follow(self, follower):
"""Make ``follower`` follow this profile
"""
if not isinstance(follower, Profile):
raise TypeError('Folloer must be a Profile object')
timestamp = None if self.locked else datetime.utcnow()
user_follow = UserFollow(follower=follower, followed=self, accepted_at=timestamp)
2018-07-11 14:40:42 +00:00
db.session.add(user_follow)
notification = self.notify(follower, self, NotificationAction.follow)
2018-07-11 14:40:42 +00:00
db.session.add(notification)
return user_follow
def notify(self, actor, item, action):
"""Notify this profile about ``action`` on ``item`` by ``actor``
:param actor: the actor who generated the notification
:type actor: Profile
:param item: the item ``action`` was performed on
:type item: any
:param action: the type of the action
:type action: NotificationAction, str
:raises TypeError: if ``actor`` is not a `Profile` object
:returns: the generated notification. It is already added to the database session, but
not committed
:rtype: Notification
"""
if not isinstance(actor, Profile):
raise TypeError('actor must be a Profile instance')
if isinstance(action, str):
action = NotificationAction[action]
notification = Notification(profile=self, actor=actor, item=item, action=action)
return notification
2018-06-29 11:25:09 +00:00
class Event(db.Model):
"""Database model for events
"""
2018-06-29 11:25:09 +00:00
__tablename__ = 'events'
# pylint: disable=invalid-name
2018-06-29 11:25:09 +00:00
id = db.Column(db.Integer(), primary_key=True)
#: The UUID of the event. This is what is presented to the users and used in federation.
event_uuid = db.Column(db.String(length=40), unique=True, nullable=False, default=generate_uuid)
#: The ID of the profile who created the event
profile_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), nullable=False)
2018-06-29 11:25:09 +00:00
profile = db.relationship('Profile', backref=db.backref('events', lazy='dynamic'))
2018-06-29 11:25:09 +00:00
#: The title of the event
title = db.Column(db.Unicode(length=200), nullable=False)
#: The time zone to be used for `start_time` and `end_time`
time_zone = db.Column(db.String(length=80), nullable=False)
#: The starting timestamp of the event. It is in the UTC time zone
2018-06-29 11:25:09 +00:00
start_time = db.Column(db.DateTime(), nullable=False)
#: The ending timestamp of the event. It is in the UTC time zone
2018-06-29 11:25:09 +00:00
end_time = db.Column(db.DateTime(), nullable=False)
#: If `True`, the event is a whole-day event
all_day = db.Column(db.Boolean(), default=False)
#: The description of the event
description = db.Column(db.UnicodeText())
#: The visibility of the event
visibility = db.Column(db.Enum(EventVisibility), nullable=False)
def __as_tz(self, timestamp, as_timezone=None):
2018-07-08 17:47:05 +00:00
from pytz import timezone, utc
2018-07-08 17:47:05 +00:00
utc_timestamp = utc.localize(dt=timestamp)
return utc_timestamp.astimezone(as_timezone or timezone(self.time_zone))
@property
def start_time_tz(self):
"""The same timestamp as `start_time`, but in the time zone specified by `time_zone`.
"""
return self.__as_tz(self.start_time)
@property
def end_time_tz(self):
"""The same timestamp as `end_time`, but in the time zone specified by `time_zone`.
"""
return self.__as_tz(self.end_time)
def start_time_for_user(self, user):
"""The same timestamp as `start_time`, but in the time zone of `user`
"""
return self.__as_tz(self.start_time, as_timezone=user.timezone)
def end_time_for_user(self, user):
"""The same timestamp as `end_time`, but in the time zone of `user`
"""
return self.__as_tz(self.end_time, as_timezone=user.timezone)
2018-06-29 11:25:09 +00:00
def __repr__(self):
return f'<Event {self.id} ({self.title}) of {self.profile}>'
2018-07-03 08:51:43 +00:00
def __str__(self):
return self.title
@property
def url(self):
2018-07-10 14:39:16 +00:00
"""The URL of the event
"""
from flask import url_for
return url_for('event_details', event_uuid=self.event_uuid)
def invite(self, inviter, invited):
"""Invite ``invited`` to the event
The invitation will arrive from ``inviter``.
"""
invite = Invitation(event=self, sender=inviter, invitee=invited)
db.session.add(invite)
notification = invited.notify(inviter, self, NotificationAction.invite)
db.session.add(notification)
return invite
2018-07-03 08:51:43 +00:00
2018-07-08 17:47:05 +00:00
class UserSetting(db.Model): # pylint: disable=too-few-public-methods
2018-07-03 08:51:43 +00:00
"""Database model for user settings
"""
__tablename__ = 'user_settings'
#: The ID of the user this setting belongs to
user_id = db.Column(db.Integer(), db.ForeignKey('users.id'), primary_key=True)
user = db.relationship('User')
#: The settings key
key = db.Column(db.String(length=40), primary_key=True)
#: The settings value
value = db.Column(db.UnicodeText())
def __repr__(self):
return f'<UserSetting of {self.user}, {self.key}="{self.value}">'
2018-07-04 04:30:59 +00:00
class AuditLog(db.Model):
"""Database model for audit log records
"""
__tablename__ = 'audit_log'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
TYPE_LOGIN_SUCCESS = 'login'
TYPE_LOGIN_FAIL = 'failed_login'
TYPE_LOGOUT = 'logout'
TYPES = (
(TYPE_LOGIN_SUCCESS, _('%(user)s logged in')),
(TYPE_LOGIN_FAIL, _('%(user)s failed to log in')),
(TYPE_LOGOUT, _('%(user)s logged out')),
)
user_id = db.Column(db.Integer(), db.ForeignKey('users.id'))
user = db.relationship('User')
ip_address = db.Column(db.String(length=40), nullable=False)
timestamp = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
log_type = db.Column(ChoiceType(TYPES), nullable=False, index=True)
extra_data = db.Column(db.UnicodeText())
def __str__(self):
format_string = dict(self.TYPES).get(self.log_type,
_('UNKNOWN RECORD "%(log_type)s" for %(user)s'))
message = str(self.timestamp) + \
(format_string % {
'user': self.user.username,
'log_type': self.log_type
})
if self.extra_data:
message += f' {self.extra_data}'
return message
def __repr__(self):
return f'<AuditLog {self.log_type} for {self.user} at {self.timestamp}>'
@classmethod
def get_message(cls, key, *args, **kwargs):
"""Get the translated message for ``key``
"""
return lazy_gettext(dict(cls.TYPES).get(key), *args, **kwargs)
@classmethod
def log(cls, user, log_type, extra_data=None, logger=None):
"""Create a new audit log record
:param user: the user the new record corresponds to
:type user: User
:param log_type: the type of the record. Must be present in `TYPES`
:type log_type: str
:param extra_data: extra text to be added to the record
:type extra_data: str, None
:param logger: if set, logs will go to this logger instead of the default (calsocial)
:type logger: Logger
:raises TypeError: if ``user`` is not an instance of `User`
:raises ValueError: if ``log_type`` is not a valid log type
"""
from logging import getLogger
from flask import has_request_context, request
logger = logger or getLogger('calsocial')
if not isinstance(user, User):
raise TypeError('user must be a User instance')
if log_type not in dict(cls.TYPES):
raise ValueError('log_type must be a valid log type')
if has_request_context():
ip = request.remote_addr or 'UNKNOWN'
else:
ip = 'NON-REQUEST'
record = cls(user=user,
timestamp=datetime.utcnow(),
log_type=log_type,
ip_address=ip,
extra_data=extra_data)
db.session.add(record)
db.session.commit()
with force_locale('en'):
message = f'Audit: [{ip}] [{user.id}] ' + cls.get_message(log_type, user=user.username)
logger.info(message)
2018-07-09 06:33:04 +00:00
class UserFollow(db.Model): # pylint: disable=too-few-public-methods
2018-07-09 06:33:04 +00:00
"""Database model for user follows
"""
#: The ID of the profile that is following the other
follower_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), primary_key=True)
follower = db.relationship('Profile', foreign_keys=[follower_id])
2018-07-09 06:33:04 +00:00
#: The ID of the profile being followed
followed_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), primary_key=True)
followed = db.relationship('Profile', foreign_keys=[followed_id])
2018-07-09 06:33:04 +00:00
#: The timestamp when the follow was initiated
initiated_at = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
#: The timestamp when the follow was accepted
accepted_at = db.Column(db.DateTime(), nullable=True)
def accept(self):
"""Accept this follow request
"""
self.accepted_at = datetime.utcnow()
class Notification(db.Model):
"""Database model for notifications
"""
__tablename__ = 'notifications'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The recipient of the notification
profile_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), nullable=False)
profile = db.relationship('Profile',
backref=db.backref('notifications', lazy='dynamic'),
foreign_keys=[profile_id])
#: The profile that generated the notification
actor_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), nullable=True)
actor = db.relationship('Profile', foreign_keys=[actor_id])
#: The item (e.g. event) that generated the notification
item_id = db.Column(db.Integer(), nullable=True)
#: The type of the item that generated the notification
item_type = db.Column(db.String(length=40))
#: The type of action
action = db.Column(db.Enum(NotificationAction))
#: The timestamp when the notification was created
created_at = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
#: The timestamp when the notification was read
read_at = db.Column(db.DateTime(), nullable=True)
@property
def item(self):
"""The subject of the notification
"""
item_class = self._decl_class_registry.get(self.item_type)
if item_class is None:
warn(f'Unknown item type {self.item_type}')
return None
return item_class.query.get(self.item_id)
@item.setter
def item(self, value):
self.item_type = value.__class__.__name__
self.item_id = value.id
@property
def message(self):
"""Get the translated message for ``key``
"""
from flask_security import current_user
messages = NOTIFICATION_ACTION_MESSAGES.get(self.action)
message = messages[0 if self.item == current_user.profile else 1]
return lazy_gettext(message, actor=self.actor, item=self.item)
2018-07-09 05:58:29 +00:00
@property
def html(self):
"""Get the translated message for ``key`` in HTML format
"""
2018-07-10 14:39:16 +00:00
from flask import Markup
from flask_security import current_user
messages = NOTIFICATION_ACTION_MESSAGES.get(self.action)
message = messages[0 if self.item == current_user.profile else 1]
actor = f'<a href="{self.actor.url}">{self.actor}</a>'
item = f'<a href="{self.item.url}">{self.item}</a>'
return Markup(lazy_gettext(message, actor=actor, item=item))
2018-07-09 05:58:29 +00:00
class Invitation(db.Model): # pylint: disable=too-few-public-methods
"""Database model for event invitations
"""
__tablename__ = 'invitations'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
2018-07-09 05:58:29 +00:00
#: The ID of the senders profile
sender_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), index=True)
2018-07-09 05:58:29 +00:00
sender = db.relationship('Profile', foreign_keys=[sender_id])
#: The ID of the invitees profile
invitee_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), index=True)
2018-07-09 05:58:29 +00:00
invitee = db.relationship('Profile', foreign_keys=[invitee_id])
#: The ID of the event
event_id = db.Column(db.Integer(), db.ForeignKey('events.id'), index=True)
2018-07-09 05:58:29 +00:00
event = db.relationship('Event', backref=db.backref('invitations', lazy='dynamic'))
#: The timestamp of the invitation
timestamp = db.Column(db.DateTime(), default=datetime.utcnow)
class Response(db.Model): # pylint: disable=too-few-public-methods
"""Database model for RSVPs.
"""
__tablename__ = 'responses'
#: The profile thats sending the RSVP
profile_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), primary_key=True)
profile = db.relationship('Profile')
#: The ID of the event
event_id = db.Column(db.Integer(), db.ForeignKey('events.id'), primary_key=True)
event = db.relationship('Event')
#: The ID of the invitation, if the user is responding to an invitation
invitation_id = db.Column(db.Integer(), db.ForeignKey('invitations.id'), nullable=True)
invitation = db.relationship('Invitation')
#: The timestamp of the response
timestamp = db.Column(db.DateTime(), default=datetime.utcnow)
#: The response itself
response = db.Column(db.Enum(ResponseType), nullable=False)
2018-07-25 19:19:27 +00:00
class AppState(app_state_base(db.Model)): # pylint: disable=too-few-public-methods,inherit-non-class
"""Database model for application state values
"""
__tablename__ = 'app_state'
#: The environment that set this key
env = db.Column(db.String(length=40), nullable=False, primary_key=True)
#: The key
key = db.Column(db.String(length=80), nullable=False, primary_key=True)
#: The value of the key
value = db.Column(db.Unicode(length=200), nullable=True)
@classmethod
def __get_state__(cls, key):
try:
record = cls.query \
.filter(cls.env == current_app.env) \
.filter(cls.key == key) \
.one()
except NoResultFound:
return None
return record.value
@classmethod
def __set_state__(cls, key, value):
try:
record = cls.query \
.filter(cls.env == current_app.env) \
.filter(cls.key == key) \
.one()
except NoResultFound:
record = cls(env=current_app.env, key=key)
record.value = value
db.session.add(record)
db.session.commit()
@classmethod
def __set_state_default__(cls, key, value):
try:
record = cls.query \
.filter(cls.env == current_app.env) \
.filter(cls.key == key) \
.one()
except NoResultFound:
pass
else:
return
record = cls(env=current_app.env, key=key, value=value)
db.session.add(record)
db.session.commit()
def __repr__(self):
return f'<AppState {self.env}:{self.key}="{self.value}"'