2018-07-03 06:22:58 +00:00
|
|
|
# Calendar.social
|
|
|
|
# Copyright (C) 2018 Gergely Polonkai
|
|
|
|
#
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
#
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU Affero General Public License for more details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
"""Database models for Calendar.social
|
|
|
|
"""
|
|
|
|
|
2018-06-28 12:41:14 +00:00
|
|
|
from datetime import datetime
|
2018-07-03 10:06:22 +00:00
|
|
|
from warnings import warn
|
2018-06-28 12:41:14 +00:00
|
|
|
|
2018-07-04 04:30:59 +00:00
|
|
|
from flask_babelex import lazy_gettext
|
2018-06-28 12:41:14 +00:00
|
|
|
from flask_security import UserMixin, RoleMixin
|
|
|
|
from flask_sqlalchemy import SQLAlchemy
|
2018-07-03 08:51:43 +00:00
|
|
|
from sqlalchemy.orm.exc import NoResultFound
|
2018-07-04 04:30:59 +00:00
|
|
|
from sqlalchemy_utils.types.choice import ChoiceType
|
|
|
|
|
|
|
|
from .utils import force_locale
|
2018-06-28 12:41:14 +00:00
|
|
|
|
|
|
|
db = SQLAlchemy()
|
|
|
|
users_roles = db.Table(
|
|
|
|
'users_roles',
|
|
|
|
db.Column('user_id', db.Integer(), db.ForeignKey('users.id')),
|
|
|
|
db.Column('role_id', db.Integer(), db.ForeignKey('roles.id')))
|
|
|
|
|
|
|
|
|
2018-07-03 08:51:43 +00:00
|
|
|
class SettingsProxy:
|
|
|
|
"""Proxy object to get settings for a user
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, user):
|
|
|
|
self.user = user
|
|
|
|
|
|
|
|
def __getitem__(self, key):
|
|
|
|
setting = UserSetting.query \
|
|
|
|
.filter(UserSetting.user == self.user) \
|
|
|
|
.filter(UserSetting.key == key) \
|
|
|
|
.first()
|
|
|
|
|
|
|
|
if setting is None:
|
|
|
|
return None
|
|
|
|
|
|
|
|
return setting.value
|
|
|
|
|
|
|
|
def __setitem__(self, key, value):
|
|
|
|
try:
|
|
|
|
setting = UserSetting.query \
|
|
|
|
.filter(UserSetting.user == self.user) \
|
|
|
|
.filter(UserSetting.key == key) \
|
|
|
|
.one()
|
|
|
|
except NoResultFound:
|
|
|
|
setting = UserSetting(user=self.user, key=key)
|
|
|
|
|
|
|
|
setting.value = str(value)
|
|
|
|
db.session.add(setting)
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return f'<SettingsProxy for {self.user}>'
|
|
|
|
|
|
|
|
|
2018-06-28 12:41:14 +00:00
|
|
|
class User(db.Model, UserMixin):
|
2018-07-03 06:22:58 +00:00
|
|
|
"""Database model for users
|
|
|
|
"""
|
|
|
|
|
2018-06-28 12:41:14 +00:00
|
|
|
__tablename__ = 'users'
|
|
|
|
id = db.Column(db.Integer(), primary_key=True)
|
|
|
|
|
|
|
|
#: The username of the user. This is also the display name and thus is immutable
|
|
|
|
username = db.Column(db.String(length=50), unique=True, nullable=False)
|
|
|
|
|
|
|
|
#: The email address of the user
|
|
|
|
email = db.Column(db.String(length=255), unique=True, nullable=True)
|
|
|
|
|
|
|
|
#: The (hashed) password of the user
|
|
|
|
password = db.Column(db.String(length=255))
|
|
|
|
|
|
|
|
#: A flag to show whether the user is enabled (active) or not
|
|
|
|
active = db.Column(db.Boolean(), default=False)
|
|
|
|
|
|
|
|
#: The timestamp when this user was created
|
|
|
|
created_at = db.Column(db.DateTime(), default=datetime.utcnow)
|
|
|
|
|
|
|
|
#: The timestamp when the user was activated
|
|
|
|
confirmed_at = db.Column(db.DateTime())
|
|
|
|
|
|
|
|
#: The roles of the user
|
|
|
|
roles = db.relationship('Role',
|
|
|
|
secondary=users_roles,
|
|
|
|
backref=db.backref('users', lazy='dynamic'))
|
|
|
|
|
2018-07-03 08:51:43 +00:00
|
|
|
@property
|
|
|
|
def settings(self):
|
|
|
|
"""Get a settings proxy for the user
|
|
|
|
"""
|
|
|
|
|
|
|
|
proxy = getattr(self, '_settings', None)
|
|
|
|
|
|
|
|
if proxy is None:
|
|
|
|
proxy = SettingsProxy(self)
|
|
|
|
setattr(self, '_settings', proxy)
|
|
|
|
|
|
|
|
return proxy
|
|
|
|
|
2018-07-03 10:06:22 +00:00
|
|
|
@property
|
|
|
|
def timezone(self):
|
|
|
|
from flask import current_app
|
2018-07-08 20:28:29 +00:00
|
|
|
from pytz import timezone
|
2018-07-03 10:06:22 +00:00
|
|
|
from pytz.exceptions import UnknownTimeZoneError
|
|
|
|
|
|
|
|
timezone_str = self.settings['timezone']
|
|
|
|
|
2018-07-08 20:28:29 +00:00
|
|
|
if timezone_str:
|
|
|
|
try:
|
|
|
|
return timezone(timezone_str)
|
|
|
|
except UnknownTimeZoneError:
|
|
|
|
warn(f'Timezone of {self} (or the default timezone) "{timezone_str}" is invalid')
|
2018-07-03 10:06:22 +00:00
|
|
|
|
2018-07-08 20:28:29 +00:00
|
|
|
return current_app.timezone
|
2018-07-03 10:06:22 +00:00
|
|
|
|
2018-06-29 11:24:54 +00:00
|
|
|
def __repr__(self):
|
2018-06-28 12:41:14 +00:00
|
|
|
return f'<User {self.id}({self.username})>'
|
|
|
|
|
|
|
|
|
|
|
|
class Role(db.Model, RoleMixin):
|
2018-07-03 06:22:58 +00:00
|
|
|
"""Database model for roles
|
|
|
|
"""
|
|
|
|
|
2018-06-28 12:41:14 +00:00
|
|
|
__tablename__ = 'roles'
|
|
|
|
id = db.Column(db.Integer(), primary_key=True)
|
|
|
|
|
|
|
|
#: The name of the role
|
|
|
|
name = db.Column(db.Unicode(length=80), unique=True)
|
|
|
|
|
|
|
|
#: A description of the role
|
|
|
|
description = db.Column(db.UnicodeText)
|
2018-06-29 11:24:54 +00:00
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return f'<Role {self.id}({self.name})>'
|
2018-06-29 11:25:09 +00:00
|
|
|
|
|
|
|
|
|
|
|
class Event(db.Model):
|
2018-07-03 06:22:58 +00:00
|
|
|
"""Database model for events
|
|
|
|
"""
|
|
|
|
|
2018-06-29 11:25:09 +00:00
|
|
|
__tablename__ = 'events'
|
|
|
|
id = db.Column(db.Integer(), primary_key=True)
|
|
|
|
|
|
|
|
#: The ID of the user who created the event
|
|
|
|
user_id = db.Column(db.Integer(), db.ForeignKey('users.id'), nullable=False)
|
|
|
|
|
|
|
|
user = db.relationship('User', backref=db.backref('events', lazy='dynamic'))
|
|
|
|
|
|
|
|
#: The title of the event
|
|
|
|
title = db.Column(db.Unicode(length=200), nullable=False)
|
|
|
|
|
|
|
|
#: The time zone to be used for `start_time` and `end_time`
|
|
|
|
time_zone = db.Column(db.String(length=80), nullable=False)
|
|
|
|
|
2018-07-03 06:22:58 +00:00
|
|
|
#: The starting timestamp of the event. It is in the UTC time zone
|
2018-06-29 11:25:09 +00:00
|
|
|
start_time = db.Column(db.DateTime(), nullable=False)
|
|
|
|
|
2018-07-03 06:22:58 +00:00
|
|
|
#: The ending timestamp of the event. It is in the UTC time zone
|
2018-06-29 11:25:09 +00:00
|
|
|
end_time = db.Column(db.DateTime(), nullable=False)
|
|
|
|
|
|
|
|
#: If `True`, the event is a whole-day event
|
|
|
|
all_day = db.Column(db.Boolean(), default=False)
|
|
|
|
|
|
|
|
#: The description of the event
|
|
|
|
description = db.Column(db.UnicodeText())
|
|
|
|
|
2018-07-03 10:03:54 +00:00
|
|
|
def __as_tz(self, timestamp, as_timezone=None):
|
|
|
|
from pytz import timezone, UTC
|
2018-07-02 13:07:53 +00:00
|
|
|
|
2018-07-03 10:03:54 +00:00
|
|
|
utc_timestamp = UTC.localize(timestamp)
|
|
|
|
|
|
|
|
return utc_timestamp.astimezone(as_timezone or timezone(self.time_zone))
|
2018-07-02 13:07:53 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def start_time_tz(self):
|
2018-07-03 06:22:58 +00:00
|
|
|
"""The same timestamp as `start_time`, but in the time zone specified by `time_zone`.
|
|
|
|
"""
|
|
|
|
|
2018-07-02 13:07:53 +00:00
|
|
|
return self.__as_tz(self.start_time)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def end_time_tz(self):
|
2018-07-03 06:22:58 +00:00
|
|
|
"""The same timestamp as `end_time`, but in the time zone specified by `time_zone`.
|
|
|
|
"""
|
|
|
|
|
2018-07-02 13:07:53 +00:00
|
|
|
return self.__as_tz(self.end_time)
|
|
|
|
|
2018-07-03 10:09:15 +00:00
|
|
|
def start_time_for_user(self, user):
|
|
|
|
"""The same timestamp as `start_time`, but in the time zone of `user`
|
|
|
|
"""
|
|
|
|
|
|
|
|
return self.__as_tz(self.start_time, as_timezone=user.timezone)
|
|
|
|
|
|
|
|
def end_time_for_user(self, user):
|
|
|
|
"""The same timestamp as `end_time`, but in the time zone of `user`
|
|
|
|
"""
|
|
|
|
|
|
|
|
return self.__as_tz(self.end_time, as_timezone=user.timezone)
|
|
|
|
|
2018-06-29 11:25:09 +00:00
|
|
|
def __repr__(self):
|
|
|
|
return f'<Event {self.id} ({self.title}) of {self.user}>'
|
2018-07-03 08:51:43 +00:00
|
|
|
|
|
|
|
|
|
|
|
class UserSetting(db.Model):
|
|
|
|
"""Database model for user settings
|
|
|
|
"""
|
|
|
|
|
|
|
|
__tablename__ = 'user_settings'
|
|
|
|
|
|
|
|
#: The ID of the user this setting belongs to
|
|
|
|
user_id = db.Column(db.Integer(), db.ForeignKey('users.id'), primary_key=True)
|
|
|
|
|
|
|
|
user = db.relationship('User')
|
|
|
|
|
|
|
|
#: The settings key
|
|
|
|
key = db.Column(db.String(length=40), primary_key=True)
|
|
|
|
|
|
|
|
#: The settings value
|
|
|
|
value = db.Column(db.UnicodeText())
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return f'<UserSetting of {self.user}, {self.key}="{self.value}">'
|
2018-07-04 04:30:59 +00:00
|
|
|
|
|
|
|
|
|
|
|
def _(translate): # pylint: disable=invalid-name
|
|
|
|
"""Function to mark strings as translatable
|
|
|
|
|
|
|
|
The actual translation will be fetched later in `:meth:AuditLog.get_message`.
|
|
|
|
"""
|
|
|
|
|
|
|
|
return translate
|
|
|
|
|
|
|
|
|
|
|
|
class AuditLog(db.Model):
|
|
|
|
"""Database model for audit log records
|
|
|
|
"""
|
|
|
|
|
|
|
|
__tablename__ = 'audit_log'
|
|
|
|
# pylint: disable=invalid-name
|
|
|
|
id = db.Column(db.Integer(), primary_key=True)
|
|
|
|
|
|
|
|
TYPE_LOGIN_SUCCESS = 'login'
|
|
|
|
TYPE_LOGIN_FAIL = 'failed_login'
|
|
|
|
TYPE_LOGOUT = 'logout'
|
|
|
|
|
|
|
|
TYPES = (
|
|
|
|
(TYPE_LOGIN_SUCCESS, _('%(user)s logged in')),
|
|
|
|
(TYPE_LOGIN_FAIL, _('%(user)s failed to log in')),
|
|
|
|
(TYPE_LOGOUT, _('%(user)s logged out')),
|
|
|
|
)
|
|
|
|
|
|
|
|
user_id = db.Column(db.Integer(), db.ForeignKey('users.id'))
|
|
|
|
|
|
|
|
user = db.relationship('User')
|
|
|
|
|
|
|
|
ip_address = db.Column(db.String(length=40), nullable=False)
|
|
|
|
|
|
|
|
timestamp = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
|
|
|
|
|
|
|
|
log_type = db.Column(ChoiceType(TYPES), nullable=False, index=True)
|
|
|
|
|
|
|
|
extra_data = db.Column(db.UnicodeText())
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
format_string = dict(self.TYPES).get(self.log_type,
|
|
|
|
_('UNKNOWN RECORD "%(log_type)s" for %(user)s'))
|
|
|
|
message = str(self.timestamp) + \
|
|
|
|
(format_string % {
|
|
|
|
'user': self.user.username,
|
|
|
|
'log_type': self.log_type
|
|
|
|
})
|
|
|
|
|
|
|
|
if self.extra_data:
|
|
|
|
message += f' {self.extra_data}'
|
|
|
|
|
|
|
|
return message
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return f'<AuditLog {self.log_type} for {self.user} at {self.timestamp}>'
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def get_message(cls, key, *args, **kwargs):
|
|
|
|
"""Get the translated message for ``key``
|
|
|
|
"""
|
|
|
|
|
|
|
|
return lazy_gettext(dict(cls.TYPES).get(key), *args, **kwargs)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def log(cls, user, log_type, extra_data=None, logger=None):
|
|
|
|
"""Create a new audit log record
|
|
|
|
|
|
|
|
:param user: the user the new record corresponds to
|
|
|
|
:type user: User
|
|
|
|
:param log_type: the type of the record. Must be present in `TYPES`
|
|
|
|
:type log_type: str
|
|
|
|
:param extra_data: extra text to be added to the record
|
|
|
|
:type extra_data: str, None
|
|
|
|
:param logger: if set, logs will go to this logger instead of the default (calsocial)
|
|
|
|
:type logger: Logger
|
|
|
|
:raises TypeError: if ``user`` is not an instance of `User`
|
|
|
|
:raises ValueError: if ``log_type`` is not a valid log type
|
|
|
|
"""
|
|
|
|
|
|
|
|
from logging import getLogger
|
|
|
|
|
|
|
|
from flask import has_request_context, request
|
|
|
|
|
|
|
|
logger = logger or getLogger('calsocial')
|
|
|
|
|
|
|
|
if not isinstance(user, User):
|
|
|
|
raise TypeError('user must be a User instance')
|
|
|
|
|
|
|
|
if log_type not in dict(cls.TYPES):
|
|
|
|
raise ValueError('log_type must be a valid log type')
|
|
|
|
|
|
|
|
if has_request_context():
|
|
|
|
ip = request.remote_addr or 'UNKNOWN'
|
|
|
|
else:
|
|
|
|
ip = 'NON-REQUEST'
|
|
|
|
|
|
|
|
record = cls(user=user,
|
|
|
|
timestamp=datetime.utcnow(),
|
|
|
|
log_type=log_type,
|
|
|
|
ip_address=ip,
|
|
|
|
extra_data=extra_data)
|
|
|
|
db.session.add(record)
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
with force_locale('en'):
|
|
|
|
message = f'Audit: [{ip}] [{user.id}] ' + cls.get_message(log_type, user=user.username)
|
|
|
|
|
|
|
|
logger.info(message)
|