calendar-social/calsocial/models.py

974 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Calendar.social
# Copyright (C) 2018 Gergely Polonkai
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Database models for Calendar.social
"""
from datetime import datetime
from enum import Enum
from warnings import warn
from flask import current_app
from flask_babelex import lazy_gettext
from flask_security import UserMixin, RoleMixin
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy_utils.types.choice import ChoiceType
from .app_state import app_state_base
from .cache import cache
from .utils import force_locale
db = SQLAlchemy()
users_roles = db.Table(
'users_roles',
db.Column('user_id', db.Integer(), db.ForeignKey('users.id')),
db.Column('role_id', db.Integer(), db.ForeignKey('roles.id')))
def generate_uuid():
"""Generate a UUID (version 4)
:returns: the hexadecimal representation of the generated UUID
:rtype: str
"""
from uuid import uuid4
return uuid4().hex
def _(translate): # pylint: disable=invalid-name
"""Function to mark strings as translatable
The actual translation will be fetched later in `:meth:AuditLog.get_message`.
"""
return translate
class NotificationAction(Enum):
"""The possible values for notification actions
"""
#: A user followed another
follow = 1
#: A user has been invited to an event
invite = 2
NOTIFICATION_ACTION_MESSAGES = {
NotificationAction.follow: (_('%(actor)s followed you'), _('%(actor)s followed %(item)s')),
NotificationAction.invite: (None, _('%(actor)s invited you to %(item)s')),
}
class ResponseType(Enum):
"""Enumeration of event availabilities
"""
#: The user is about to attend the event
going = 0
#: The user will probably attend the event
probably_going = 1
#: The user is interested in the event, but might not go
probably_not_going = 2
#: The user wont attend the event
not_going = 3
def __hash__(self):
return Enum.__hash__(self)
def __eq__(self, other):
if isinstance(other, str):
return self.name.lower() == other.lower() # pylint: disable=no-member
if isinstance(other, (int, float)):
return self.value == other
return Enum.__eq__(self, other)
class EventVisibility(Enum):
"""Enumeration for event visibility
"""
#: The event is private, only attendees and people invited can see the details
private = 0
#: The event is public, anyone can see the details
public = 5
EVENT_VISIBILITY_TRANSLATIONS = {
EventVisibility.private: _('Visible only to attendees'),
EventVisibility.public: _('Visible to everyone'),
}
class ResponseVisibility(Enum):
"""Enumeration for response visibility
"""
#: The response is only visible to the invitee
private = 0
#: The response is only visible to the event organisers
organisers = 1
#: The response is only visible to the event attendees
attendees = 2
#: The response is visible to the invitees friends (ie. mutual follows)
friends = 3
#: The response is visible to the invitees followers
followers = 4
#: The response is visible to anyone
public = 5
RESPONSE_VISIBILITY_TRANSLATIONS = {
ResponseVisibility.private: _('Visible only to myself'),
ResponseVisibility.organisers: _('Visible only to event organisers'),
ResponseVisibility.attendees: _('Visible only to event attendees'),
ResponseVisibility.friends: _('Visible only to my friends'),
ResponseVisibility.followers: _('Visible only to my followers'),
ResponseVisibility.public: _('Visible to anyone'),
}
class SettingsProxy:
"""Proxy object to get settings for a user
"""
def __init__(self, user):
self.user = user
def __getitem__(self, key):
setting = UserSetting.query \
.filter(UserSetting.user == self.user) \
.filter(UserSetting.key == key) \
.first()
if setting is None:
return None
return setting.value
def __setitem__(self, key, value):
try:
setting = UserSetting.query \
.filter(UserSetting.user == self.user) \
.filter(UserSetting.key == key) \
.one()
except NoResultFound:
setting = UserSetting(user=self.user, key=key)
setting.value = str(value)
db.session.add(setting)
def __repr__(self):
return f'<SettingsProxy for {self.user}>'
class User(db.Model, UserMixin):
"""Database model for users
"""
__tablename__ = 'users'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The username of the user. This is also the display name and thus is immutable
username = db.Column(db.String(length=50), unique=True, nullable=False)
#: The email address of the user
email = db.Column(db.String(length=255), unique=True, nullable=True)
#: The (hashed) password of the user
password = db.Column(db.String(length=255))
#: A flag to show whether the user is enabled (active) or not
active = db.Column(db.Boolean(), default=False)
#: The timestamp when this user was created
created_at = db.Column(db.DateTime(), default=datetime.utcnow)
#: The timestamp when the user was activated
confirmed_at = db.Column(db.DateTime())
#: The roles of the user
roles = db.relationship('Role',
secondary=users_roles,
backref=db.backref('users', lazy='dynamic'))
@property
def settings(self):
"""Get a settings proxy for the user
"""
proxy = getattr(self, '_settings', None)
if proxy is None:
proxy = SettingsProxy(self)
setattr(self, '_settings', proxy)
return proxy
@property
def timezone(self):
"""The users time zone
If the user didnt set a time zone yet, the application default is used.
"""
from pytz import timezone
from pytz.exceptions import UnknownTimeZoneError
timezone_str = self.settings['timezone']
if timezone_str:
try:
return timezone(timezone_str)
except UnknownTimeZoneError:
warn(f'Timezone of {self} (or the default timezone) "{timezone_str}" is invalid')
return current_app.timezone
@property
def session_list_key(self):
"""The cache key of this users session list
"""
return f'open_sessions:{self.id}'
@property
def active_sessions(self):
"""The list of active sessions of this user
"""
return cache.get(self.session_list_key) or []
@active_sessions.setter
def active_sessions(self, value):
cache.set(self.session_list_key, list(value))
def __repr__(self):
return f'<User {self.id}({self.username})>'
class Role(db.Model, RoleMixin): # pylint: disable=too-few-public-methods
"""Database model for roles
"""
__tablename__ = 'roles'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The name of the role
name = db.Column(db.Unicode(length=80), unique=True)
#: A description of the role
description = db.Column(db.UnicodeText)
def __repr__(self):
return f'<Role {self.id}({self.name})>'
class Profile(db.Model): # pylint: disable=too-few-public-methods
"""Database model for user profiles
"""
__tablename__ = 'profiles'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The ID of the local user this profile belongs to
user_id = db.Column(db.Integer(), db.ForeignKey('users.id'), nullable=True, unique=True)
user = db.relationship(User, backref=db.backref('profile', uselist=False))
#: The username this profile belongs to. If ``None``, `user_id` must be set
username = db.Column(db.String(length=50), nullable=True)
#: The domain this profile originates from. If ``None``, `user_id` must be set
domain = db.Column(db.Unicode(length=100), nullable=True)
#: The display name
display_name = db.Column(db.Unicode(length=80), nullable=False)
#: If locked, a profile cannot be followed without the owners consent
locked = db.Column(db.Boolean(), default=False)
#: If set, the profile will display this builtin avatar
builtin_avatar = db.Column(db.String(length=40), nullable=True)
@property
def fqn(self):
"""The fully qualified name of the profile
For local profiles, this is in the form ``@username``; for remote users, its in the form
``@username@domain``.
"""
if self.user:
return f'@{self.user.username}'
return f'@{self.username}@{self.domain}'
def __repr__(self):
if self.user:
username = self.user.username
domain = ''
else:
username = self.username
domain = f'@{self.domain}'
return f'<Profile {self.id}(@{username}{domain})>'
def __str__(self):
ret = ''
if self.display_name:
ret = self.display_name + ' '
ret += f'({self.fqn})'
return ret
@property
def followed_list(self):
"""List of profiles this profile is following
"""
# This will always be empty for remote profiles
if not self.user:
return []
return Profile.query \
.join(UserFollow.followed) \
.filter(UserFollow.follower == self) \
.filter(UserFollow.accepted_at.isnot(None))
@property
def follower_list(self):
"""List of profiles that follow this profile
"""
# This will always be empty for remote profiles
if not self.user:
return []
return Profile.query \
.join(UserFollow.follower) \
.filter(UserFollow.followed == self) \
.filter(UserFollow.accepted_at.isnot(None))
@property
def url(self):
"""Get the URL for this profile
"""
from flask import url_for
if self.user:
return url_for('display_profile', username=self.user.username)
return NotImplemented
def follow(self, follower):
"""Make ``follower`` follow this profile
"""
if not isinstance(follower, Profile):
raise TypeError('Folloer must be a Profile object')
timestamp = None if self.locked else datetime.utcnow()
user_follow = UserFollow(follower=follower, followed=self, accepted_at=timestamp)
db.session.add(user_follow)
notification = self.notify(follower, self, NotificationAction.follow)
db.session.add(notification)
return user_follow
def notify(self, actor, item, action):
"""Notify this profile about ``action`` on ``item`` by ``actor``
:param actor: the actor who generated the notification
:type actor: Profile
:param item: the item ``action`` was performed on
:type item: any
:param action: the type of the action
:type action: NotificationAction, str
:raises TypeError: if ``actor`` is not a `Profile` object
:returns: the generated notification. It is already added to the database session, but
not committed
:rtype: Notification
"""
if not isinstance(actor, Profile):
raise TypeError('actor must be a Profile instance')
if isinstance(action, str):
action = NotificationAction[action]
notification = Notification(profile=self, actor=actor, item=item, action=action)
return notification
def is_following(self, profile):
"""Check if this profile is following ``profile``
"""
try:
UserFollow.query.filter(UserFollow.follower == self).filter(UserFollow.followed == profile).one()
except NoResultFound:
return False
return True
def is_friend_of(self, profile):
"""Check if this profile is friends with ``profile``
"""
reverse = db.aliased(UserFollow)
try:
UserFollow.query \
.filter(UserFollow.follower == self) \
.join(reverse, UserFollow.followed_id == reverse.follower_id) \
.filter(UserFollow.follower_id == reverse.followed_id) \
.one()
except NoResultFound:
return False
return True
def is_attending(self, event):
"""Check if this profile is attending ``event``
"""
try:
Response.query.filter(Response.profile == self).filter(Response.event == event).one()
except NoResultFound:
return False
return True
class Event(db.Model):
"""Database model for events
"""
__tablename__ = 'events'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The UUID of the event. This is what is presented to the users and used in federation.
event_uuid = db.Column(db.String(length=40), unique=True, nullable=False, default=generate_uuid)
#: The ID of the profile who created the event
profile_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), nullable=False)
profile = db.relationship('Profile', backref=db.backref('events', lazy='dynamic'))
#: The title of the event
title = db.Column(db.Unicode(length=200), nullable=False)
#: The time zone to be used for `start_time` and `end_time`
time_zone = db.Column(db.String(length=80), nullable=False)
#: The starting timestamp of the event. It is in the UTC time zone
start_time = db.Column(db.DateTime(), nullable=False)
#: The ending timestamp of the event. It is in the UTC time zone
end_time = db.Column(db.DateTime(), nullable=False)
#: If `True`, the event is a whole-day event
all_day = db.Column(db.Boolean(), default=False)
#: The description of the event
description = db.Column(db.UnicodeText())
#: The visibility of the event
visibility = db.Column(db.Enum(EventVisibility), nullable=False)
def __as_tz(self, timestamp, as_timezone=None):
from pytz import timezone, utc
utc_timestamp = utc.localize(dt=timestamp)
return utc_timestamp.astimezone(as_timezone or timezone(self.time_zone))
@property
def start_time_tz(self):
"""The same timestamp as `start_time`, but in the time zone specified by `time_zone`.
"""
return self.__as_tz(self.start_time)
@property
def end_time_tz(self):
"""The same timestamp as `end_time`, but in the time zone specified by `time_zone`.
"""
return self.__as_tz(self.end_time)
def start_time_for_user(self, user):
"""The same timestamp as `start_time`, but in the time zone of `user`
"""
return self.__as_tz(self.start_time, as_timezone=user.timezone)
def end_time_for_user(self, user):
"""The same timestamp as `end_time`, but in the time zone of `user`
"""
return self.__as_tz(self.end_time, as_timezone=user.timezone)
def __repr__(self):
return f'<Event {self.id} ({self.title}) of {self.profile}>'
def __str__(self):
return self.title
@property
def url(self):
"""The URL of the event
"""
from flask import url_for
return url_for('event_details', event_uuid=self.event_uuid)
def invite(self, inviter, invited):
"""Invite ``invited`` to the event
The invitation will arrive from ``inviter``.
"""
invite = Invitation(event=self, sender=inviter, invitee=invited)
db.session.add(invite)
notification = invited.notify(inviter, self, NotificationAction.invite)
db.session.add(notification)
return invite
class UserSetting(db.Model): # pylint: disable=too-few-public-methods
"""Database model for user settings
"""
__tablename__ = 'user_settings'
#: The ID of the user this setting belongs to
user_id = db.Column(db.Integer(), db.ForeignKey('users.id'), primary_key=True)
user = db.relationship('User')
#: The settings key
key = db.Column(db.String(length=40), primary_key=True)
#: The settings value
value = db.Column(db.UnicodeText())
def __repr__(self):
return f'<UserSetting of {self.user}, {self.key}="{self.value}">'
class AuditLog(db.Model):
"""Database model for audit log records
"""
__tablename__ = 'audit_log'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
TYPE_LOGIN_SUCCESS = 'login'
TYPE_LOGIN_FAIL = 'failed_login'
TYPE_LOGOUT = 'logout'
TYPES = (
(TYPE_LOGIN_SUCCESS, _('%(user)s logged in')),
(TYPE_LOGIN_FAIL, _('%(user)s failed to log in')),
(TYPE_LOGOUT, _('%(user)s logged out')),
)
user_id = db.Column(db.Integer(), db.ForeignKey('users.id'))
user = db.relationship('User')
ip_address = db.Column(db.String(length=40), nullable=False)
timestamp = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
log_type = db.Column(ChoiceType(TYPES), nullable=False, index=True)
extra_data = db.Column(db.UnicodeText())
def __str__(self):
format_string = dict(self.TYPES).get(self.log_type,
_('UNKNOWN RECORD "%(log_type)s" for %(user)s'))
message = str(self.timestamp) + \
(format_string % {
'user': self.user.username,
'log_type': self.log_type
})
if self.extra_data:
message += f' {self.extra_data}'
return message
def __repr__(self):
return f'<AuditLog {self.log_type} for {self.user} at {self.timestamp}>'
@classmethod
def get_message(cls, key, *args, **kwargs):
"""Get the translated message for ``key``
"""
return lazy_gettext(dict(cls.TYPES).get(key), *args, **kwargs)
@classmethod
def log(cls, user, log_type, extra_data=None, logger=None):
"""Create a new audit log record
:param user: the user the new record corresponds to
:type user: User
:param log_type: the type of the record. Must be present in `TYPES`
:type log_type: str
:param extra_data: extra text to be added to the record
:type extra_data: str, None
:param logger: if set, logs will go to this logger instead of the default (calsocial)
:type logger: Logger
:raises TypeError: if ``user`` is not an instance of `User`
:raises ValueError: if ``log_type`` is not a valid log type
"""
from logging import getLogger
from flask import has_request_context, request
logger = logger or getLogger('calsocial')
if not isinstance(user, User):
raise TypeError('user must be a User instance')
if log_type not in dict(cls.TYPES):
raise ValueError('log_type must be a valid log type')
if has_request_context():
ip = request.remote_addr or 'UNKNOWN'
else:
ip = 'NON-REQUEST'
record = cls(user=user,
timestamp=datetime.utcnow(),
log_type=log_type,
ip_address=ip,
extra_data=extra_data)
db.session.add(record)
db.session.commit()
with force_locale('en'):
message = f'Audit: [{ip}] [{user.id}] ' + cls.get_message(log_type, user=user.username)
logger.info(message)
class UserFollow(db.Model): # pylint: disable=too-few-public-methods
"""Database model for user follows
"""
#: The ID of the profile that is following the other
follower_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), primary_key=True)
follower = db.relationship('Profile', foreign_keys=[follower_id])
#: The ID of the profile being followed
followed_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), primary_key=True)
followed = db.relationship('Profile', foreign_keys=[followed_id])
#: The timestamp when the follow was initiated
initiated_at = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
#: The timestamp when the follow was accepted
accepted_at = db.Column(db.DateTime(), nullable=True)
def accept(self):
"""Accept this follow request
"""
self.accepted_at = datetime.utcnow()
class Notification(db.Model):
"""Database model for notifications
"""
__tablename__ = 'notifications'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The recipient of the notification
profile_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), nullable=False)
profile = db.relationship('Profile',
backref=db.backref('notifications', lazy='dynamic'),
foreign_keys=[profile_id])
#: The profile that generated the notification
actor_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), nullable=True)
actor = db.relationship('Profile', foreign_keys=[actor_id])
#: The item (e.g. event) that generated the notification
item_id = db.Column(db.Integer(), nullable=True)
#: The type of the item that generated the notification
item_type = db.Column(db.String(length=40))
#: The type of action
action = db.Column(db.Enum(NotificationAction))
#: The timestamp when the notification was created
created_at = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
#: The timestamp when the notification was read
read_at = db.Column(db.DateTime(), nullable=True)
@property
def item(self):
"""The subject of the notification
"""
item_class = self._decl_class_registry.get(self.item_type)
if item_class is None:
warn(f'Unknown item type {self.item_type}')
return None
return item_class.query.get(self.item_id)
@item.setter
def item(self, value):
self.item_type = value.__class__.__name__
self.item_id = value.id
@property
def message(self):
"""Get the translated message for ``key``
"""
from flask_security import current_user
messages = NOTIFICATION_ACTION_MESSAGES.get(self.action)
message = messages[0 if self.item == current_user.profile else 1]
return lazy_gettext(message, actor=self.actor, item=self.item)
@property
def html(self):
"""Get the translated message for ``key`` in HTML format
"""
from flask import Markup
from flask_security import current_user
messages = NOTIFICATION_ACTION_MESSAGES.get(self.action)
message = messages[0 if self.item == current_user.profile else 1]
actor = f'<a href="{self.actor.url}">{self.actor}</a>'
item = f'<a href="{self.item.url}">{self.item}</a>'
return Markup(lazy_gettext(message, actor=actor, item=item))
class Invitation(db.Model): # pylint: disable=too-few-public-methods
"""Database model for event invitations
"""
__tablename__ = 'invitations'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The ID of the senders profile
sender_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), index=True)
sender = db.relationship('Profile', foreign_keys=[sender_id])
#: The ID of the invitees profile
invitee_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), index=True)
invitee = db.relationship('Profile', foreign_keys=[invitee_id])
#: The ID of the event
event_id = db.Column(db.Integer(), db.ForeignKey('events.id'), index=True)
event = db.relationship('Event', backref=db.backref('invitations', lazy='dynamic'))
#: The timestamp of the invitation
timestamp = db.Column(db.DateTime(), default=datetime.utcnow)
class Response(db.Model): # pylint: disable=too-few-public-methods
"""Database model for RSVPs.
"""
__tablename__ = 'responses'
#: The profile thats sending the RSVP
profile_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), primary_key=True)
profile = db.relationship('Profile')
#: The ID of the event
event_id = db.Column(db.Integer(), db.ForeignKey('events.id'), primary_key=True)
event = db.relationship('Event')
#: The ID of the invitation, if the user is responding to an invitation
invitation_id = db.Column(db.Integer(), db.ForeignKey('invitations.id'), nullable=True)
invitation = db.relationship('Invitation')
#: The timestamp of the response
timestamp = db.Column(db.DateTime(), default=datetime.utcnow)
#: The response itself
response = db.Column(db.Enum(ResponseType), nullable=False)
#: The visibility of the response
visibility = db.Column(db.Enum(ResponseVisibility), nullable=False)
def visible_to(self, profile):
"""Checks if the response can be visible to ``profile``.
:param profile: the profile looking at the response. If None, it is viewed as anonymous
:type profile: Profile, None
:returns: ``True`` if the response should be visible, ``False`` otherwise
:rtype: bool
"""
if self.profile == profile:
return True
if self.visibility == ResponseVisibility.private:
return False
if self.visibility == ResponseVisibility.organisers:
return profile == self.event.profile
if self.visibility == ResponseVisibility.attendees:
return profile is not None and \
(profile.is_attending(self.event) or \
profile == self.event.profile)
# From this point on, if the event is not public, only attendees can see responses
if self.event.visibility != EventVisibility.public:
return profile is not None and \
(profile.is_attending(self.event) or
profile == self.event.profile)
if self.visibility == ResponseVisibility.friends:
return profile is not None and \
(profile.is_friend_of(self.profile) or \
profile.is_attending(self.event) or \
profile == self.event.profile or \
profile == self.profile)
if self.visibility == ResponseVisibility.followers:
return profile is not None and \
(profile.is_following(self.profile) or \
profile.is_attending(self.event) or \
profile == self.event.profile or \
profile == self.profile)
return self.visibility == ResponseVisibility.public
class AppState(app_state_base(db.Model)): # pylint: disable=too-few-public-methods
"""Database model for application state values
"""
__tablename__ = 'app_state'
#: The environment that set this key
env = db.Column(db.String(length=40), nullable=False, primary_key=True)
#: The key
key = db.Column(db.String(length=80), nullable=False, primary_key=True)
#: The value of the key
value = db.Column(db.Unicode(length=200), nullable=True)
@classmethod
def __get_state__(cls, key):
try:
record = cls.query \
.filter(cls.env == current_app.env) \
.filter(cls.key == key) \
.one()
except NoResultFound:
return None
return record.value
@classmethod
def __set_state__(cls, key, value):
try:
record = cls.query \
.filter(cls.env == current_app.env) \
.filter(cls.key == key) \
.one()
except NoResultFound:
record = cls(env=current_app.env, key=key)
record.value = value
db.session.add(record)
db.session.commit()
@classmethod
def __set_state_default__(cls, key, value):
try:
record = cls.query \
.filter(cls.env == current_app.env) \
.filter(cls.key == key) \
.one()
except NoResultFound:
pass
else:
return
record = cls(env=current_app.env, key=key, value=value)
db.session.add(record)
db.session.commit()
def __repr__(self):
return f'<AppState {self.env}:{self.key}="{self.value}"'