diff --git a/calsocial/models.py b/calsocial/models.py index e1f3be8..63c8d5e 100644 --- a/calsocial/models.py +++ b/calsocial/models.py @@ -20,9 +20,13 @@ from datetime import datetime from warnings import warn +from flask_babelex import lazy_gettext from flask_security import UserMixin, RoleMixin from flask_sqlalchemy import SQLAlchemy from sqlalchemy.orm.exc import NoResultFound +from sqlalchemy_utils.types.choice import ChoiceType + +from .utils import force_locale db = SQLAlchemy() users_roles = db.Table( @@ -231,3 +235,113 @@ class UserSetting(db.Model): def __repr__(self): return f'' + + +def _(translate): # pylint: disable=invalid-name + """Function to mark strings as translatable + + The actual translation will be fetched later in `:meth:AuditLog.get_message`. + """ + + return translate + + +class AuditLog(db.Model): + """Database model for audit log records + """ + + __tablename__ = 'audit_log' + # pylint: disable=invalid-name + id = db.Column(db.Integer(), primary_key=True) + + TYPE_LOGIN_SUCCESS = 'login' + TYPE_LOGIN_FAIL = 'failed_login' + TYPE_LOGOUT = 'logout' + + TYPES = ( + (TYPE_LOGIN_SUCCESS, _('%(user)s logged in')), + (TYPE_LOGIN_FAIL, _('%(user)s failed to log in')), + (TYPE_LOGOUT, _('%(user)s logged out')), + ) + + user_id = db.Column(db.Integer(), db.ForeignKey('users.id')) + + user = db.relationship('User') + + ip_address = db.Column(db.String(length=40), nullable=False) + + timestamp = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False) + + log_type = db.Column(ChoiceType(TYPES), nullable=False, index=True) + + extra_data = db.Column(db.UnicodeText()) + + def __str__(self): + format_string = dict(self.TYPES).get(self.log_type, + _('UNKNOWN RECORD "%(log_type)s" for %(user)s')) + message = str(self.timestamp) + \ + (format_string % { + 'user': self.user.username, + 'log_type': self.log_type + }) + + if self.extra_data: + message += f' {self.extra_data}' + + return message + + def __repr__(self): + return f'' + + @classmethod + def get_message(cls, key, *args, **kwargs): + """Get the translated message for ``key`` + """ + + return lazy_gettext(dict(cls.TYPES).get(key), *args, **kwargs) + + @classmethod + def log(cls, user, log_type, extra_data=None, logger=None): + """Create a new audit log record + + :param user: the user the new record corresponds to + :type user: User + :param log_type: the type of the record. Must be present in `TYPES` + :type log_type: str + :param extra_data: extra text to be added to the record + :type extra_data: str, None + :param logger: if set, logs will go to this logger instead of the default (calsocial) + :type logger: Logger + :raises TypeError: if ``user`` is not an instance of `User` + :raises ValueError: if ``log_type`` is not a valid log type + """ + + from logging import getLogger + + from flask import has_request_context, request + + logger = logger or getLogger('calsocial') + + if not isinstance(user, User): + raise TypeError('user must be a User instance') + + if log_type not in dict(cls.TYPES): + raise ValueError('log_type must be a valid log type') + + if has_request_context(): + ip = request.remote_addr or 'UNKNOWN' + else: + ip = 'NON-REQUEST' + + record = cls(user=user, + timestamp=datetime.utcnow(), + log_type=log_type, + ip_address=ip, + extra_data=extra_data) + db.session.add(record) + db.session.commit() + + with force_locale('en'): + message = f'Audit: [{ip}] [{user.id}] ' + cls.get_message(log_type, user=user.username) + + logger.info(message)