# Calendar.social # Copyright (C) 2018 Gergely Polonkai # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """Database models for Calendar.social """ from datetime import datetime from enum import Enum from warnings import warn from flask import current_app from flask_babelex import lazy_gettext from flask_security import UserMixin, RoleMixin from flask_sqlalchemy import SQLAlchemy from sqlalchemy.orm.exc import NoResultFound from sqlalchemy_utils.types.choice import ChoiceType from .app_state import app_state_base from .cache import cache from .utils import force_locale db = SQLAlchemy() users_roles = db.Table( 'users_roles', db.Column('user_id', db.Integer(), db.ForeignKey('users.id')), db.Column('role_id', db.Integer(), db.ForeignKey('roles.id'))) def generate_uuid(): """Generate a UUID (version 4) :returns: the hexadecimal representation of the generated UUID :rtype: str """ from uuid import uuid4 return uuid4().hex def _(translate): # pylint: disable=invalid-name """Function to mark strings as translatable The actual translation will be fetched later in `:meth:AuditLog.get_message`. """ return translate class NotificationAction(Enum): """The possible values for notification actions """ #: A user followed another follow = 1 #: A user has been invited to an event invite = 2 def __hash__(self): return Enum.__hash__(self) def __eq__(self, other): if isinstance(other, str): return self.name.lower() == other.lower() # pylint: disable=no-member if isinstance(other, (int, float)): return self.value == other return Enum.__eq__(self, other) NOTIFICATION_ACTION_MESSAGES = { NotificationAction.follow: (_('%(actor)s followed you'), _('%(actor)s followed %(item)s')), NotificationAction.invite: (None, _('%(actor)s invited you to %(item)s')), } class ResponseType(Enum): """Enumeration of event availabilities """ #: The user is about to attend the event going = 0 #: The user will probably attend the event probably_going = 1 #: The user is interested in the event, but might not go probably_not_going = 2 #: The user won’t attend the event not_going = 3 def __hash__(self): return Enum.__hash__(self) def __eq__(self, other): if isinstance(other, str): return self.name.lower() == other.lower() # pylint: disable=no-member if isinstance(other, (int, float)): return self.value == other return Enum.__eq__(self, other) class EventVisibility(Enum): """Enumeration for event visibility """ #: The event is private, only attendees and people invited can see the details private = 0 #: The event is public, anyone can see the details public = 5 EVENT_VISIBILITY_TRANSLATIONS = { EventVisibility.private: _('Visible only to attendees'), EventVisibility.public: _('Visible to everyone'), } class ResponseVisibility(Enum): """Enumeration for response visibility """ #: The response is only visible to the invitee private = 0 #: The response is only visible to the event organisers organisers = 1 #: The response is only visible to the event attendees attendees = 2 #: The response is visible to the invitee’s friends (ie. mutual follows) friends = 3 #: The response is visible to the invitee’s followers followers = 4 #: The response is visible to anyone public = 5 RESPONSE_VISIBILITY_TRANSLATIONS = { ResponseVisibility.private: _('Visible only to myself'), ResponseVisibility.organisers: _('Visible only to event organisers'), ResponseVisibility.attendees: _('Visible only to event attendees'), ResponseVisibility.friends: _('Visible only to my friends'), ResponseVisibility.followers: _('Visible only to my followers'), ResponseVisibility.public: _('Visible to anyone'), } class EventAvailability(Enum): """Enumeration of event availabilities """ free = 0 busy = 1 def __hash__(self): return Enum.__hash__(self) def __eq__(self, other): if isinstance(other, str): return self.name.lower() == other.lower() # pylint: disable=no-member if isinstance(other, (int, float)): return self.value == other return Enum.__eq__(self, other) class UserAvailability(Enum): """Enumeration of user availabilities """ free = 0 busy = 1 tentative = 2 def __hash__(self): return Enum.__hash__(self) def __eq__(self, other): if isinstance(other, str): return self.name.lower() == other.lower() # pylint: disable=no-member if isinstance(other, (int, float)): return self.value == other return Enum.__eq__(self, other) class SettingsProxy: """Proxy object to get settings for a user """ def __init__(self, user): self.user = user def __getitem__(self, key): setting = UserSetting.query \ .filter(UserSetting.user == self.user) \ .filter(UserSetting.key == key) \ .first() if setting is None: return None return setting.value def __setitem__(self, key, value): try: setting = UserSetting.query \ .filter(UserSetting.user == self.user) \ .filter(UserSetting.key == key) \ .one() except NoResultFound: setting = UserSetting(user=self.user, key=key) setting.value = str(value) db.session.add(setting) def __repr__(self): return f'' class User(db.Model, UserMixin): """Database model for users """ __tablename__ = 'users' # pylint: disable=invalid-name id = db.Column(db.Integer(), primary_key=True) #: The username of the user. This is also the display name and thus is immutable username = db.Column(db.String(length=50), unique=True, nullable=False) #: The email address of the user email = db.Column(db.String(length=255), unique=True, nullable=True) #: The (hashed) password of the user password = db.Column(db.String(length=255)) #: A flag to show whether the user is enabled (active) or not active = db.Column(db.Boolean(), default=False) #: The timestamp when this user was created created_at = db.Column(db.DateTime(), default=datetime.utcnow) #: The timestamp when the user was activated confirmed_at = db.Column(db.DateTime()) #: The roles of the user roles = db.relationship('Role', secondary=users_roles, backref=db.backref('users', lazy='dynamic')) @property def settings(self): """Get a settings proxy for the user """ proxy = getattr(self, '_settings', None) if proxy is None: proxy = SettingsProxy(self) setattr(self, '_settings', proxy) return proxy @property def timezone(self): """The user’s time zone If the user didn’t set a time zone yet, the application default is used. """ from pytz import timezone from pytz.exceptions import UnknownTimeZoneError timezone_str = self.settings['timezone'] if timezone_str: try: return timezone(timezone_str) except UnknownTimeZoneError: warn(f'Timezone of {self} (or the default timezone) "{timezone_str}" is invalid') return current_app.timezone @property def session_list_key(self): """The cache key of this user’s session list """ return f'open_sessions:{self.id}' @property def active_sessions(self): """The list of active sessions of this user """ return cache.get(self.session_list_key) or [] @active_sessions.setter def active_sessions(self, value): cache.set(self.session_list_key, list(value)) def __repr__(self): return f'' class Role(db.Model, RoleMixin): # pylint: disable=too-few-public-methods """Database model for roles """ __tablename__ = 'roles' # pylint: disable=invalid-name id = db.Column(db.Integer(), primary_key=True) #: The name of the role name = db.Column(db.Unicode(length=80), unique=True) #: A description of the role description = db.Column(db.UnicodeText) def __repr__(self): return f'' class Profile(db.Model): # pylint: disable=too-few-public-methods """Database model for user profiles """ __tablename__ = 'profiles' # pylint: disable=invalid-name id = db.Column(db.Integer(), primary_key=True) #: The ID of the local user this profile belongs to user_id = db.Column(db.Integer(), db.ForeignKey('users.id'), nullable=True, unique=True) user = db.relationship(User, backref=db.backref('profile', uselist=False)) #: The username this profile belongs to. If ``None``, `user_id` must be set username = db.Column(db.String(length=50), nullable=True) #: The domain this profile originates from. If ``None``, `user_id` must be set domain = db.Column(db.Unicode(length=100), nullable=True) #: The display name display_name = db.Column(db.Unicode(length=80), nullable=False) #: If locked, a profile cannot be followed without the owner’s consent locked = db.Column(db.Boolean(), default=False) #: If set, the profile will display this builtin avatar builtin_avatar = db.Column(db.String(length=40), nullable=True) @property def fqn(self): """The fully qualified name of the profile For local profiles, this is in the form ``@username``; for remote users, it’s in the form ``@username@domain``. """ if self.user: return f'@{self.user.username}' return f'@{self.username}@{self.domain}' def __repr__(self): if self.user: username = self.user.username domain = '' else: username = self.username domain = f'@{self.domain}' return f'' def __str__(self): ret = '' if self.display_name: ret = self.display_name + ' ' ret += f'({self.fqn})' return ret @property def followed_list(self): """List of profiles this profile is following """ # This will always be empty for remote profiles if not self.user: return [] return Profile.query \ .join(UserFollow.followed) \ .filter(UserFollow.follower == self) \ .filter(UserFollow.accepted_at.isnot(None)) @property def follower_list(self): """List of profiles that follow this profile """ # This will always be empty for remote profiles if not self.user: return [] return Profile.query \ .join(UserFollow.follower) \ .filter(UserFollow.followed == self) \ .filter(UserFollow.accepted_at.isnot(None)) @property def url(self): """Get the URL for this profile """ from flask import url_for if self.user: return url_for('display_profile', username=self.user.username) return NotImplemented def follow(self, follower): """Make ``follower`` follow this profile """ if not isinstance(follower, Profile): raise TypeError('Folloer must be a Profile object') timestamp = None if self.locked else datetime.utcnow() user_follow = UserFollow(follower=follower, followed=self, accepted_at=timestamp) db.session.add(user_follow) notification = self.notify(follower, self, NotificationAction.follow) db.session.add(notification) return user_follow def notify(self, actor, item, action): """Notify this profile about ``action`` on ``item`` by ``actor`` :param actor: the actor who generated the notification :type actor: Profile :param item: the item ``action`` was performed on :type item: any :param action: the type of the action :type action: NotificationAction, str :raises TypeError: if ``actor`` is not a `Profile` object :returns: the generated notification. It is already added to the database session, but not committed :rtype: Notification """ if not isinstance(actor, Profile): raise TypeError('actor must be a Profile instance') if isinstance(action, str): action = NotificationAction[action] notification = Notification(profile=self, actor=actor, item=item, action=action) return notification def is_following(self, profile): """Check if this profile is following ``profile`` """ try: UserFollow.query.filter(UserFollow.follower == self).filter(UserFollow.followed == profile).one() except NoResultFound: return False return True def is_friend_of(self, profile): """Check if this profile is friends with ``profile`` """ reverse = db.aliased(UserFollow) try: UserFollow.query \ .filter(UserFollow.follower == self) \ .join(reverse, UserFollow.followed_id == reverse.follower_id) \ .filter(UserFollow.follower_id == reverse.followed_id) \ .one() except NoResultFound: return False return True def is_attending(self, event): """Check if this profile is attending ``event`` """ try: Response.query.filter(Response.profile == self).filter(Response.event == event).one() except NoResultFound: return False return True class Event(db.Model): """Database model for events """ __tablename__ = 'events' # pylint: disable=invalid-name id = db.Column(db.Integer(), primary_key=True) #: The UUID of the event. This is what is presented to the users and used in federation. event_uuid = db.Column(db.String(length=40), unique=True, nullable=False, default=generate_uuid) #: The ID of the profile who created the event profile_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), nullable=False) profile = db.relationship('Profile', backref=db.backref('events', lazy='dynamic')) #: The title of the event title = db.Column(db.Unicode(length=200), nullable=False) #: The time zone to be used for `start_time` and `end_time` time_zone = db.Column(db.String(length=80), nullable=False) #: The starting timestamp of the event. It is in the UTC time zone start_time = db.Column(db.DateTime(), nullable=False) #: The ending timestamp of the event. It is in the UTC time zone end_time = db.Column(db.DateTime(), nullable=False) #: If `True`, the event is a whole-day event all_day = db.Column(db.Boolean(), default=False) #: The description of the event description = db.Column(db.UnicodeText()) #: The visibility of the event visibility = db.Column(db.Enum(EventVisibility), nullable=False) def __as_tz(self, timestamp, as_timezone=None): from pytz import timezone, utc utc_timestamp = utc.localize(dt=timestamp) return utc_timestamp.astimezone(as_timezone or timezone(self.time_zone)) @property def start_time_tz(self): """The same timestamp as `start_time`, but in the time zone specified by `time_zone`. """ return self.__as_tz(self.start_time) @property def end_time_tz(self): """The same timestamp as `end_time`, but in the time zone specified by `time_zone`. """ return self.__as_tz(self.end_time) def start_time_for_user(self, user): """The same timestamp as `start_time`, but in the time zone of `user` """ return self.__as_tz(self.start_time, as_timezone=user.timezone) def end_time_for_user(self, user): """The same timestamp as `end_time`, but in the time zone of `user` """ return self.__as_tz(self.end_time, as_timezone=user.timezone) def __repr__(self): return f'' def __str__(self): return self.title @property def url(self): """The URL of the event """ from flask import url_for return url_for('event_details', event_uuid=self.event_uuid) def invite(self, inviter, invited): """Invite ``invited`` to the event The invitation will arrive from ``inviter``. """ invite = Invitation(event=self, sender=inviter, invitee=invited) db.session.add(invite) notification = invited.notify(inviter, self, NotificationAction.invite) db.session.add(notification) return invite class UserSetting(db.Model): # pylint: disable=too-few-public-methods """Database model for user settings """ __tablename__ = 'user_settings' #: The ID of the user this setting belongs to user_id = db.Column(db.Integer(), db.ForeignKey('users.id'), primary_key=True) user = db.relationship('User') #: The settings key key = db.Column(db.String(length=40), primary_key=True) #: The settings value value = db.Column(db.UnicodeText()) def __repr__(self): return f'' class AuditLog(db.Model): """Database model for audit log records """ __tablename__ = 'audit_log' # pylint: disable=invalid-name id = db.Column(db.Integer(), primary_key=True) TYPE_LOGIN_SUCCESS = 'login' TYPE_LOGIN_FAIL = 'failed_login' TYPE_LOGOUT = 'logout' TYPES = ( (TYPE_LOGIN_SUCCESS, _('%(user)s logged in')), (TYPE_LOGIN_FAIL, _('%(user)s failed to log in')), (TYPE_LOGOUT, _('%(user)s logged out')), ) user_id = db.Column(db.Integer(), db.ForeignKey('users.id')) user = db.relationship('User') ip_address = db.Column(db.String(length=40), nullable=False) timestamp = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False) log_type = db.Column(ChoiceType(TYPES), nullable=False, index=True) extra_data = db.Column(db.UnicodeText()) def __str__(self): format_string = dict(self.TYPES).get(self.log_type, _('UNKNOWN RECORD "%(log_type)s" for %(user)s')) message = str(self.timestamp) + \ (format_string % { 'user': self.user.username, 'log_type': self.log_type }) if self.extra_data: message += f' {self.extra_data}' return message def __repr__(self): return f'' @classmethod def get_message(cls, key, *args, **kwargs): """Get the translated message for ``key`` """ return lazy_gettext(dict(cls.TYPES).get(key), *args, **kwargs) @classmethod def log(cls, user, log_type, extra_data=None, logger=None): """Create a new audit log record :param user: the user the new record corresponds to :type user: User :param log_type: the type of the record. Must be present in `TYPES` :type log_type: str :param extra_data: extra text to be added to the record :type extra_data: str, None :param logger: if set, logs will go to this logger instead of the default (calsocial) :type logger: Logger :raises TypeError: if ``user`` is not an instance of `User` :raises ValueError: if ``log_type`` is not a valid log type """ from logging import getLogger from flask import has_request_context, request logger = logger or getLogger('calsocial') if not isinstance(user, User): raise TypeError('user must be a User instance') if log_type not in dict(cls.TYPES): raise ValueError('log_type must be a valid log type') if has_request_context(): ip = request.remote_addr or 'UNKNOWN' else: ip = 'NON-REQUEST' record = cls(user=user, timestamp=datetime.utcnow(), log_type=log_type, ip_address=ip, extra_data=extra_data) db.session.add(record) db.session.commit() with force_locale('en'): message = f'Audit: [{ip}] [{user.id}] ' + cls.get_message(log_type, user=user.username) logger.info(message) class UserFollow(db.Model): # pylint: disable=too-few-public-methods """Database model for user follows """ #: The ID of the profile that is following the other follower_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), primary_key=True) follower = db.relationship('Profile', foreign_keys=[follower_id]) #: The ID of the profile being followed followed_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), primary_key=True) followed = db.relationship('Profile', foreign_keys=[followed_id]) #: The timestamp when the follow was initiated initiated_at = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False) #: The timestamp when the follow was accepted accepted_at = db.Column(db.DateTime(), nullable=True) def accept(self): """Accept this follow request """ self.accepted_at = datetime.utcnow() class Notification(db.Model): """Database model for notifications """ __tablename__ = 'notifications' # pylint: disable=invalid-name id = db.Column(db.Integer(), primary_key=True) #: The recipient of the notification profile_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), nullable=False) profile = db.relationship('Profile', backref=db.backref('notifications', lazy='dynamic'), foreign_keys=[profile_id]) #: The profile that generated the notification actor_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), nullable=True) actor = db.relationship('Profile', foreign_keys=[actor_id]) #: The item (e.g. event) that generated the notification item_id = db.Column(db.Integer(), nullable=True) #: The type of the item that generated the notification item_type = db.Column(db.String(length=40)) #: The type of action action = db.Column(db.Enum(NotificationAction)) #: The timestamp when the notification was created created_at = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False) #: The timestamp when the notification was read read_at = db.Column(db.DateTime(), nullable=True) @property def item(self): """The subject of the notification """ item_class = self._decl_class_registry.get(self.item_type) if item_class is None: warn(f'Unknown item type {self.item_type}') return None return item_class.query.get(self.item_id) @item.setter def item(self, value): self.item_type = value.__class__.__name__ self.item_id = value.id @property def message(self): """Get the translated message for ``key`` """ from flask_security import current_user messages = NOTIFICATION_ACTION_MESSAGES.get(self.action) message = messages[0 if self.item == current_user.profile else 1] return lazy_gettext(message, actor=self.actor, item=self.item) @property def html(self): """Get the translated message for ``key`` in HTML format """ from flask import Markup from flask_security import current_user messages = NOTIFICATION_ACTION_MESSAGES.get(self.action) message = messages[0 if self.item == current_user.profile else 1] actor = f'{self.actor}' item = f'{self.item}' return Markup(lazy_gettext(message, actor=actor, item=item)) class Invitation(db.Model): # pylint: disable=too-few-public-methods """Database model for event invitations """ __tablename__ = 'invitations' # pylint: disable=invalid-name id = db.Column(db.Integer(), primary_key=True) #: The ID of the sender’s profile sender_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), index=True) sender = db.relationship('Profile', foreign_keys=[sender_id]) #: The ID of the invitee’s profile invitee_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), index=True) invitee = db.relationship('Profile', foreign_keys=[invitee_id]) #: The ID of the event event_id = db.Column(db.Integer(), db.ForeignKey('events.id'), index=True) event = db.relationship('Event', backref=db.backref('invitations', lazy='dynamic')) #: The timestamp of the invitation timestamp = db.Column(db.DateTime(), default=datetime.utcnow) class Response(db.Model): # pylint: disable=too-few-public-methods """Database model for RSVPs. """ __tablename__ = 'responses' #: The profile that’s sending the RSVP profile_id = db.Column(db.Integer(), db.ForeignKey('profiles.id'), primary_key=True) profile = db.relationship('Profile') #: The ID of the event event_id = db.Column(db.Integer(), db.ForeignKey('events.id'), primary_key=True) event = db.relationship('Event') #: The ID of the invitation, if the user is responding to an invitation invitation_id = db.Column(db.Integer(), db.ForeignKey('invitations.id'), nullable=True) invitation = db.relationship('Invitation') #: The timestamp of the response timestamp = db.Column(db.DateTime(), default=datetime.utcnow) #: The response itself response = db.Column(db.Enum(ResponseType), nullable=False) #: The visibility of the response visibility = db.Column(db.Enum(ResponseVisibility), nullable=False) def visible_to(self, profile): """Checks if the response can be visible to ``profile``. :param profile: the profile looking at the response. If None, it is viewed as anonymous :type profile: Profile, None :returns: ``True`` if the response should be visible, ``False`` otherwise :rtype: bool """ if self.profile == profile: return True if self.visibility == ResponseVisibility.private: return False if self.visibility == ResponseVisibility.organisers: return profile == self.event.profile if self.visibility == ResponseVisibility.attendees: return profile is not None and \ (profile.is_attending(self.event) or \ profile == self.event.profile) # From this point on, if the event is not public, only attendees can see responses if self.event.visibility != EventVisibility.public: return profile is not None and \ (profile.is_attending(self.event) or profile == self.event.profile) if self.visibility == ResponseVisibility.friends: return profile is not None and \ (profile.is_friend_of(self.profile) or \ profile.is_attending(self.event) or \ profile == self.event.profile or \ profile == self.profile) if self.visibility == ResponseVisibility.followers: return profile is not None and \ (profile.is_following(self.profile) or \ profile.is_attending(self.event) or \ profile == self.event.profile or \ profile == self.profile) return self.visibility == ResponseVisibility.public class AppState(app_state_base(db.Model)): # pylint: disable=too-few-public-methods """Database model for application state values """ __tablename__ = 'app_state' #: The environment that set this key env = db.Column(db.String(length=40), nullable=False, primary_key=True) #: The key key = db.Column(db.String(length=80), nullable=False, primary_key=True) #: The value of the key value = db.Column(db.Unicode(length=200), nullable=True) @classmethod def __get_state__(cls, key): try: record = cls.query \ .filter(cls.env == current_app.env) \ .filter(cls.key == key) \ .one() except NoResultFound: return None return record.value @classmethod def __set_state__(cls, key, value): try: record = cls.query \ .filter(cls.env == current_app.env) \ .filter(cls.key == key) \ .one() except NoResultFound: record = cls(env=current_app.env, key=key) record.value = value db.session.add(record) db.session.commit() @classmethod def __set_state_default__(cls, key, value): try: record = cls.query \ .filter(cls.env == current_app.env) \ .filter(cls.key == key) \ .one() except NoResultFound: pass else: return record = cls(env=current_app.env, key=key, value=value) db.session.add(record) db.session.commit() def __repr__(self): return f'