calendar-social/calsocial/models.py

683 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Calendar.social
# Copyright (C) 2018 Gergely Polonkai
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Database models for Calendar.social
"""
from datetime import datetime
from enum import Enum
from warnings import warn
from flask_babelex import lazy_gettext
from flask_security import UserMixin, RoleMixin
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy_utils.types.choice import ChoiceType
from .utils import force_locale
db = SQLAlchemy()
users_roles = db.Table(
'users_roles',
db.Column('user_id', db.Integer(), db.ForeignKey('users.id')),
db.Column('role_id', db.Integer(), db.ForeignKey('roles.id')))
def generate_uuid():
"""Generate a UUID (version 4)
:returns: the hexadecimal representation of the generated UUID
:rtype: str
"""
from uuid import uuid4
return uuid4().hex
def _(translate): # pylint: disable=invalid-name
"""Function to mark strings as translatable
The actual translation will be fetched later in `:meth:AuditLog.get_message`.
"""
return translate
class NotificationAction(Enum):
"""The possible values for notification actions
"""
#: A user followed another
follow = 1
#: A user has been invited to an event
invite = 2
NOTIFICATION_ACTION_MESSAGES = {
NotificationAction.follow: (_('%(actor)s followed you'), _('%(actor)s followed %(item)s')),
NotificationAction.invite: (None, _('%(actor)s invited you to %(item)s')),
}
class ResponseType(Enum):
"""Enumeration of event availabilities
"""
#: The user is about to attend the event
going = 0
#: The user will probably attend the event
probably_going = 1
#: The user is interested in the event, but might not go
probably_not_going = 2
#: The user wont attend the event
not_going = 3
def __hash__(self):
return Enum.__hash__(self)
def __eq__(self, other):
if isinstance(other, str):
return self.name.lower() == other.lower() # pylint: disable=no-member
if isinstance(other, (int, float)):
return self.value == other
return Enum.__eq__(self, other)
class SettingsProxy:
"""Proxy object to get settings for a user
"""
def __init__(self, user):
self.user = user
def __getitem__(self, key):
setting = UserSetting.query \
.filter(UserSetting.user == self.user) \
.filter(UserSetting.key == key) \
.first()
if setting is None:
return None
return setting.value
def __setitem__(self, key, value):
try:
setting = UserSetting.query \
.filter(UserSetting.user == self.user) \
.filter(UserSetting.key == key) \
.one()
except NoResultFound:
setting = UserSetting(user=self.user, key=key)
setting.value = str(value)
db.session.add(setting)
def __repr__(self):
return f'<SettingsProxy for {self.user}>'
class User(db.Model, UserMixin):
"""Database model for users
"""
__tablename__ = 'users'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The username of the user. This is also the display name and thus is immutable
username = db.Column(db.String(length=50), unique=True, nullable=False)
#: The email address of the user
email = db.Column(db.String(length=255), unique=True, nullable=True)
#: The (hashed) password of the user
password = db.Column(db.String(length=255))
#: A flag to show whether the user is enabled (active) or not
active = db.Column(db.Boolean(), default=False)
#: The timestamp when this user was created
created_at = db.Column(db.DateTime(), default=datetime.utcnow)
#: The timestamp when the user was activated
confirmed_at = db.Column(db.DateTime())
#: The roles of the user
roles = db.relationship('Role',
secondary=users_roles,
backref=db.backref('users', lazy='dynamic'))
@property
def settings(self):
"""Get a settings proxy for the user
"""
proxy = getattr(self, '_settings', None)
if proxy is None:
proxy = SettingsProxy(self)
setattr(self, '_settings', proxy)
return proxy
@property
def timezone(self):
"""The users time zone
If the user didnt set a time zone yet, the application default is used.
"""
from flask import current_app
from pytz import timezone
from pytz.exceptions import UnknownTimeZoneError
timezone_str = self.settings['timezone']
if timezone_str:
try:
return timezone(timezone_str)
except UnknownTimeZoneError:
warn(f'Timezone of {self} (or the default timezone) "{timezone_str}" is invalid')
return current_app.timezone
def __repr__(self):
return f'<User {self.id}({self.username})>'
class Role(db.Model, RoleMixin): # pylint: disable=too-few-public-methods
"""Database model for roles
"""
__tablename__ = 'roles'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The name of the role
name = db.Column(db.Unicode(length=80), unique=True)
#: A description of the role
description = db.Column(db.UnicodeText)
def __repr__(self):
return f'<Role {self.id}({self.name})>'
class Profile(db.Model): # pylint: disable=too-few-public-methods
"""Database model for user profiles
"""
__tablename__ = 'profiles'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The ID of the local user this profile belongs to
user_id = db.Column(db.Integer(), db.ForeignKey('users.id'), nullable=True, unique=True)
user = db.relationship(User, backref=db.backref('profile', uselist=False))
#: The username this profile belongs to. If ``None``, `user_id` must be set
username = db.Column(db.String(length=50), nullable=True)
#: The domain this profile originates from. If ``None``, `user_id` must be set
domain = db.Column(db.Unicode(length=100), nullable=True)
#: The display name
display_name = db.Column(db.Unicode(length=80), nullable=False)
@property
def fqn(self):
"""The fully qualified name of the profile
For local profiles, this is in the form ``@username``; for remote users, its in the form
``@username@domain``.
"""
if self.user:
return f'@{self.user.username}'
return f'@{self.username}@{self.domain}'
def __repr__(self):
if self.user:
username = self.user.username
domain = ''
else:
username = self.username
domain = '@' + self.domain
return f'<Profile {self.id}(@{username}{domain})>'
def __str__(self):
ret = ''
if self.display_name:
ret = self.display_name + ' '
ret += f'({self.fqn})'
return ret
@property
def followed_list(self):
"""List of profiles this profile is following
"""
# This will always be empty for remote profiles
if not self.user:
return []
return Profile.query \
.join(UserFollow.followed) \
.filter(UserFollow.follower == self)
@property
def follower_list(self):
"""List of profiles that follow this profile
"""
# This will always be empty for remote profiles
if not self.user:
return []
return Profile.query \
.join(UserFollow.follower) \
.filter(UserFollow.followed == self)
@property
def url(self):
"""Get the URL for this profile
"""
from flask import url_for
if self.user:
return url_for('display_profile', username=self.user.username)
return NotImplemented
class Event(db.Model):
"""Database model for events
"""
__tablename__ = 'events'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The UUID of the event. This is what is presented to the users and used in federation.
event_uuid = db.Column(db.String(length=40), unique=True, nullable=False, default=generate_uuid)
#: The ID of the profile who created the event
profile_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), nullable=False)
profile = db.relationship('Profile', backref=db.backref('events', lazy='dynamic'))
#: The title of the event
title = db.Column(db.Unicode(length=200), nullable=False)
#: The time zone to be used for `start_time` and `end_time`
time_zone = db.Column(db.String(length=80), nullable=False)
#: The starting timestamp of the event. It is in the UTC time zone
start_time = db.Column(db.DateTime(), nullable=False)
#: The ending timestamp of the event. It is in the UTC time zone
end_time = db.Column(db.DateTime(), nullable=False)
#: If `True`, the event is a whole-day event
all_day = db.Column(db.Boolean(), default=False)
#: The description of the event
description = db.Column(db.UnicodeText())
def __as_tz(self, timestamp, as_timezone=None):
from pytz import timezone, utc
utc_timestamp = utc.localize(dt=timestamp)
return utc_timestamp.astimezone(as_timezone or timezone(self.time_zone))
@property
def start_time_tz(self):
"""The same timestamp as `start_time`, but in the time zone specified by `time_zone`.
"""
return self.__as_tz(self.start_time)
@property
def end_time_tz(self):
"""The same timestamp as `end_time`, but in the time zone specified by `time_zone`.
"""
return self.__as_tz(self.end_time)
def start_time_for_user(self, user):
"""The same timestamp as `start_time`, but in the time zone of `user`
"""
return self.__as_tz(self.start_time, as_timezone=user.timezone)
def end_time_for_user(self, user):
"""The same timestamp as `end_time`, but in the time zone of `user`
"""
return self.__as_tz(self.end_time, as_timezone=user.timezone)
def __repr__(self):
return f'<Event {self.id} ({self.title}) of {self.profile}>'
def __str__(self):
return self.title
@property
def url(self):
"""The URL of the event
"""
from flask import url_for
return url_for('event_details', event_uuid=self.event_uuid)
class UserSetting(db.Model): # pylint: disable=too-few-public-methods
"""Database model for user settings
"""
__tablename__ = 'user_settings'
#: The ID of the user this setting belongs to
user_id = db.Column(db.Integer(), db.ForeignKey('users.id'), primary_key=True)
user = db.relationship('User')
#: The settings key
key = db.Column(db.String(length=40), primary_key=True)
#: The settings value
value = db.Column(db.UnicodeText())
def __repr__(self):
return f'<UserSetting of {self.user}, {self.key}="{self.value}">'
class AuditLog(db.Model):
"""Database model for audit log records
"""
__tablename__ = 'audit_log'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
TYPE_LOGIN_SUCCESS = 'login'
TYPE_LOGIN_FAIL = 'failed_login'
TYPE_LOGOUT = 'logout'
TYPES = (
(TYPE_LOGIN_SUCCESS, _('%(user)s logged in')),
(TYPE_LOGIN_FAIL, _('%(user)s failed to log in')),
(TYPE_LOGOUT, _('%(user)s logged out')),
)
user_id = db.Column(db.Integer(), db.ForeignKey('users.id'))
user = db.relationship('User')
ip_address = db.Column(db.String(length=40), nullable=False)
timestamp = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
log_type = db.Column(ChoiceType(TYPES), nullable=False, index=True)
extra_data = db.Column(db.UnicodeText())
def __str__(self):
format_string = dict(self.TYPES).get(self.log_type,
_('UNKNOWN RECORD "%(log_type)s" for %(user)s'))
message = str(self.timestamp) + \
(format_string % {
'user': self.user.username,
'log_type': self.log_type
})
if self.extra_data:
message += f' {self.extra_data}'
return message
def __repr__(self):
return f'<AuditLog {self.log_type} for {self.user} at {self.timestamp}>'
@classmethod
def get_message(cls, key, *args, **kwargs):
"""Get the translated message for ``key``
"""
return lazy_gettext(dict(cls.TYPES).get(key), *args, **kwargs)
@classmethod
def log(cls, user, log_type, extra_data=None, logger=None):
"""Create a new audit log record
:param user: the user the new record corresponds to
:type user: User
:param log_type: the type of the record. Must be present in `TYPES`
:type log_type: str
:param extra_data: extra text to be added to the record
:type extra_data: str, None
:param logger: if set, logs will go to this logger instead of the default (calsocial)
:type logger: Logger
:raises TypeError: if ``user`` is not an instance of `User`
:raises ValueError: if ``log_type`` is not a valid log type
"""
from logging import getLogger
from flask import has_request_context, request
logger = logger or getLogger('calsocial')
if not isinstance(user, User):
raise TypeError('user must be a User instance')
if log_type not in dict(cls.TYPES):
raise ValueError('log_type must be a valid log type')
if has_request_context():
ip = request.remote_addr or 'UNKNOWN'
else:
ip = 'NON-REQUEST'
record = cls(user=user,
timestamp=datetime.utcnow(),
log_type=log_type,
ip_address=ip,
extra_data=extra_data)
db.session.add(record)
db.session.commit()
with force_locale('en'):
message = f'Audit: [{ip}] [{user.id}] ' + cls.get_message(log_type, user=user.username)
logger.info(message)
class UserFollow(db.Model): # pylint: disable=too-few-public-methods
"""Database model for user follows
"""
#: The ID of the profile that is following the other
follower_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), primary_key=True)
follower = db.relationship('Profile', foreign_keys=[follower_id])
#: The ID of the profile being followed
followed_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), primary_key=True)
followed = db.relationship('Profile', foreign_keys=[followed_id])
#: The timestamp when the follow was initiated
initiated_at = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
#: The timestamp when the follow was accepted
accepted_at = db.Column(db.DateTime(), nullable=True)
class Notification(db.Model):
"""Database model for notifications
"""
__tablename__ = 'notifications'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The recipient of the notification
profile_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), nullable=False)
profile = db.relationship('Profile',
backref=db.backref('notifications', lazy='dynamic'),
foreign_keys=[profile_id])
#: The profile that generated the notification
actor_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), nullable=True)
actor = db.relationship('Profile', foreign_keys=[actor_id])
#: The item (e.g. event) that generated the notification
item_id = db.Column(db.Integer(), nullable=True)
#: The type of the item that generated the notification
item_type = db.Column(db.String(length=40))
#: The type of action
action = db.Column(db.Enum(NotificationAction))
#: The timestamp when the notification was created
created_at = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
#: The timestamp when the notification was read
read_at = db.Column(db.DateTime(), nullable=True)
@property
def item(self):
"""The subject of the notification
"""
item_class = self._decl_class_registry.get(self.item_type)
if item_class is None:
warn(f'Unknown item type {self.item_type}')
return None
return item_class.query.get(self.item_id)
@item.setter
def item(self, value):
self.item_type = value.__class__.__name__
self.item_id = value.id
@property
def message(self):
"""Get the translated message for ``key``
"""
from flask_security import current_user
messages = NOTIFICATION_ACTION_MESSAGES.get(self.action)
message = messages[0 if self.item == current_user.profile else 1]
return lazy_gettext(message, actor=self.actor, item=self.item)
@property
def html(self):
"""Get the translated message for ``key`` in HTML format
"""
from flask import Markup
from flask_security import current_user
messages = NOTIFICATION_ACTION_MESSAGES.get(self.action)
message = messages[0 if self.item == current_user.profile else 1]
actor = f'<a href="{self.actor.url}">{self.actor}</a>'
item = f'<a href="{self.item.url}">{self.item}</a>'
return Markup(lazy_gettext(message, actor=actor, item=item))
class Invitation(db.Model): # pylint: disable=too-few-public-methods
"""Database model for event invitations
"""
__tablename__ = 'invitations'
# pylint: disable=invalid-name
id = db.Column(db.Integer(), primary_key=True)
#: The ID of the senders profile
sender_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), index=True)
sender = db.relationship('Profile', foreign_keys=[sender_id])
#: The ID of the invitees profile
invitee_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), index=True)
invitee = db.relationship('Profile', foreign_keys=[invitee_id])
#: The ID of the event
event_id = db.Column(db.Integer(), db.ForeignKey('events.id'), index=True)
event = db.relationship('Event', backref=db.backref('invitations', lazy='dynamic'))
#: The timestamp of the invitation
timestamp = db.Column(db.DateTime(), default=datetime.utcnow)
class Response(db.Model): # pylint: disable=too-few-public-methods
"""Database model for RSVPs.
"""
__tablename__ = 'responses'
#: The profile thats sending the RSVP
profile_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), primary_key=True)
profile = db.relationship('Profile')
#: The ID of the event
event_id = db.Column(db.Integer(), db.ForeignKey('events.id'), primary_key=True)
event = db.relationship('Event')
#: The ID of the invitation, if the user is responding to an invitation
invitation_id = db.Column(db.Integer(), db.ForeignKey('invitations.id'), nullable=True)
invitation = db.relationship('Invitation')
#: The timestamp of the response
timestamp = db.Column(db.DateTime(), default=datetime.utcnow)
#: The response itself
response = db.Column(db.Enum(ResponseType), nullable=False)