AuditLog model
This commit is contained in:
		| @@ -20,9 +20,13 @@ | ||||
| from datetime import datetime | ||||
| from warnings import warn | ||||
|  | ||||
| from flask_babelex import lazy_gettext | ||||
| from flask_security import UserMixin, RoleMixin | ||||
| from flask_sqlalchemy import SQLAlchemy | ||||
| from sqlalchemy.orm.exc import NoResultFound | ||||
| from sqlalchemy_utils.types.choice import ChoiceType | ||||
|  | ||||
| from .utils import force_locale | ||||
|  | ||||
| db = SQLAlchemy() | ||||
| users_roles = db.Table( | ||||
| @@ -231,3 +235,113 @@ class UserSetting(db.Model): | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return f'<UserSetting of {self.user}, {self.key}="{self.value}">' | ||||
|  | ||||
|  | ||||
| def _(translate):  # pylint: disable=invalid-name | ||||
|     """Function to mark strings as translatable | ||||
|  | ||||
|     The actual translation will be fetched later in `:meth:AuditLog.get_message`. | ||||
|     """ | ||||
|  | ||||
|     return translate | ||||
|  | ||||
|  | ||||
| class AuditLog(db.Model): | ||||
|     """Database model for audit log records | ||||
|     """ | ||||
|  | ||||
|     __tablename__ = 'audit_log' | ||||
|     # pylint: disable=invalid-name | ||||
|     id = db.Column(db.Integer(), primary_key=True) | ||||
|  | ||||
|     TYPE_LOGIN_SUCCESS = 'login' | ||||
|     TYPE_LOGIN_FAIL = 'failed_login' | ||||
|     TYPE_LOGOUT = 'logout' | ||||
|  | ||||
|     TYPES = ( | ||||
|         (TYPE_LOGIN_SUCCESS, _('%(user)s logged in')), | ||||
|         (TYPE_LOGIN_FAIL, _('%(user)s failed to log in')), | ||||
|         (TYPE_LOGOUT, _('%(user)s logged out')), | ||||
|     ) | ||||
|  | ||||
|     user_id = db.Column(db.Integer(), db.ForeignKey('users.id')) | ||||
|  | ||||
|     user = db.relationship('User') | ||||
|  | ||||
|     ip_address = db.Column(db.String(length=40), nullable=False) | ||||
|  | ||||
|     timestamp = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False) | ||||
|  | ||||
|     log_type = db.Column(ChoiceType(TYPES), nullable=False, index=True) | ||||
|  | ||||
|     extra_data = db.Column(db.UnicodeText()) | ||||
|  | ||||
|     def __str__(self): | ||||
|         format_string = dict(self.TYPES).get(self.log_type, | ||||
|                                              _('UNKNOWN RECORD "%(log_type)s" for %(user)s')) | ||||
|         message = str(self.timestamp) + \ | ||||
|                   (format_string % { | ||||
|                       'user': self.user.username, | ||||
|                       'log_type': self.log_type | ||||
|                   }) | ||||
|  | ||||
|         if self.extra_data: | ||||
|             message += f' {self.extra_data}' | ||||
|  | ||||
|         return message | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return f'<AuditLog {self.log_type} for {self.user} at {self.timestamp}>' | ||||
|  | ||||
|     @classmethod | ||||
|     def get_message(cls, key, *args, **kwargs): | ||||
|         """Get the translated message for ``key`` | ||||
|         """ | ||||
|  | ||||
|         return lazy_gettext(dict(cls.TYPES).get(key), *args, **kwargs) | ||||
|  | ||||
|     @classmethod | ||||
|     def log(cls, user, log_type, extra_data=None, logger=None): | ||||
|         """Create a new audit log record | ||||
|  | ||||
|         :param user: the user the new record corresponds to | ||||
|         :type user: User | ||||
|         :param log_type: the type of the record.  Must be present in `TYPES` | ||||
|         :type log_type: str | ||||
|         :param extra_data: extra text to be added to the record | ||||
|         :type extra_data: str, None | ||||
|         :param logger: if set, logs will go to this logger instead of the default (calsocial) | ||||
|         :type logger: Logger | ||||
|         :raises TypeError: if ``user`` is not an instance of `User` | ||||
|         :raises ValueError: if ``log_type`` is not a valid log type | ||||
|         """ | ||||
|  | ||||
|         from logging import getLogger | ||||
|  | ||||
|         from flask import has_request_context, request | ||||
|  | ||||
|         logger = logger or getLogger('calsocial') | ||||
|  | ||||
|         if not isinstance(user, User): | ||||
|             raise TypeError('user must be a User instance') | ||||
|  | ||||
|         if log_type not in dict(cls.TYPES): | ||||
|             raise ValueError('log_type must be a valid log type') | ||||
|  | ||||
|         if has_request_context(): | ||||
|             ip = request.remote_addr or 'UNKNOWN' | ||||
|         else: | ||||
|             ip = 'NON-REQUEST' | ||||
|  | ||||
|         record = cls(user=user, | ||||
|                      timestamp=datetime.utcnow(), | ||||
|                      log_type=log_type, | ||||
|                      ip_address=ip, | ||||
|                      extra_data=extra_data) | ||||
|         db.session.add(record) | ||||
|         db.session.commit() | ||||
|  | ||||
|         with force_locale('en'): | ||||
|             message = f'Audit: [{ip}] [{user.id}] ' + cls.get_message(log_type, user=user.username) | ||||
|  | ||||
|         logger.info(message) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user