Allow audit logging #32
@ -20,9 +20,13 @@
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from warnings import warn
|
from warnings import warn
|
||||||
|
|
||||||
|
from flask_babelex import lazy_gettext
|
||||||
from flask_security import UserMixin, RoleMixin
|
from flask_security import UserMixin, RoleMixin
|
||||||
from flask_sqlalchemy import SQLAlchemy
|
from flask_sqlalchemy import SQLAlchemy
|
||||||
from sqlalchemy.orm.exc import NoResultFound
|
from sqlalchemy.orm.exc import NoResultFound
|
||||||
|
from sqlalchemy_utils.types.choice import ChoiceType
|
||||||
|
|
||||||
|
from .utils import force_locale
|
||||||
|
|
||||||
db = SQLAlchemy()
|
db = SQLAlchemy()
|
||||||
users_roles = db.Table(
|
users_roles = db.Table(
|
||||||
@ -231,3 +235,113 @@ class UserSetting(db.Model):
|
|||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return f'<UserSetting of {self.user}, {self.key}="{self.value}">'
|
return f'<UserSetting of {self.user}, {self.key}="{self.value}">'
|
||||||
|
|
||||||
|
|
||||||
|
def _(translate): # pylint: disable=invalid-name
|
||||||
|
"""Function to mark strings as translatable
|
||||||
|
|
||||||
|
The actual translation will be fetched later in `:meth:AuditLog.get_message`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
return translate
|
||||||
|
|
||||||
|
|
||||||
|
class AuditLog(db.Model):
|
||||||
|
"""Database model for audit log records
|
||||||
|
"""
|
||||||
|
|
||||||
|
__tablename__ = 'audit_log'
|
||||||
|
# pylint: disable=invalid-name
|
||||||
|
id = db.Column(db.Integer(), primary_key=True)
|
||||||
|
|
||||||
|
TYPE_LOGIN_SUCCESS = 'login'
|
||||||
|
TYPE_LOGIN_FAIL = 'failed_login'
|
||||||
|
TYPE_LOGOUT = 'logout'
|
||||||
|
|
||||||
|
TYPES = (
|
||||||
|
(TYPE_LOGIN_SUCCESS, _('%(user)s logged in')),
|
||||||
|
(TYPE_LOGIN_FAIL, _('%(user)s failed to log in')),
|
||||||
|
(TYPE_LOGOUT, _('%(user)s logged out')),
|
||||||
|
)
|
||||||
|
|
||||||
|
user_id = db.Column(db.Integer(), db.ForeignKey('users.id'))
|
||||||
|
|
||||||
|
user = db.relationship('User')
|
||||||
|
|
||||||
|
ip_address = db.Column(db.String(length=40), nullable=False)
|
||||||
|
|
||||||
|
timestamp = db.Column(db.DateTime(), default=datetime.utcnow, nullable=False)
|
||||||
|
|
||||||
|
log_type = db.Column(ChoiceType(TYPES), nullable=False, index=True)
|
||||||
|
|
||||||
|
extra_data = db.Column(db.UnicodeText())
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
format_string = dict(self.TYPES).get(self.log_type,
|
||||||
|
_('UNKNOWN RECORD "%(log_type)s" for %(user)s'))
|
||||||
|
message = str(self.timestamp) + \
|
||||||
|
(format_string % {
|
||||||
|
'user': self.user.username,
|
||||||
|
'log_type': self.log_type
|
||||||
|
})
|
||||||
|
|
||||||
|
if self.extra_data:
|
||||||
|
message += f' {self.extra_data}'
|
||||||
|
|
||||||
|
return message
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f'<AuditLog {self.log_type} for {self.user} at {self.timestamp}>'
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_message(cls, key, *args, **kwargs):
|
||||||
|
"""Get the translated message for ``key``
|
||||||
|
"""
|
||||||
|
|
||||||
|
return lazy_gettext(dict(cls.TYPES).get(key), *args, **kwargs)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def log(cls, user, log_type, extra_data=None, logger=None):
|
||||||
|
"""Create a new audit log record
|
||||||
|
|
||||||
|
:param user: the user the new record corresponds to
|
||||||
|
:type user: User
|
||||||
|
:param log_type: the type of the record. Must be present in `TYPES`
|
||||||
|
:type log_type: str
|
||||||
|
:param extra_data: extra text to be added to the record
|
||||||
|
:type extra_data: str, None
|
||||||
|
:param logger: if set, logs will go to this logger instead of the default (calsocial)
|
||||||
|
:type logger: Logger
|
||||||
|
:raises TypeError: if ``user`` is not an instance of `User`
|
||||||
|
:raises ValueError: if ``log_type`` is not a valid log type
|
||||||
|
"""
|
||||||
|
|
||||||
|
from logging import getLogger
|
||||||
|
|
||||||
|
from flask import has_request_context, request
|
||||||
|
|
||||||
|
logger = logger or getLogger('calsocial')
|
||||||
|
|
||||||
|
if not isinstance(user, User):
|
||||||
|
raise TypeError('user must be a User instance')
|
||||||
|
|
||||||
|
if log_type not in dict(cls.TYPES):
|
||||||
|
raise ValueError('log_type must be a valid log type')
|
||||||
|
|
||||||
|
if has_request_context():
|
||||||
|
ip = request.remote_addr or 'UNKNOWN'
|
||||||
|
else:
|
||||||
|
ip = 'NON-REQUEST'
|
||||||
|
|
||||||
|
record = cls(user=user,
|
||||||
|
timestamp=datetime.utcnow(),
|
||||||
|
log_type=log_type,
|
||||||
|
ip_address=ip,
|
||||||
|
extra_data=extra_data)
|
||||||
|
db.session.add(record)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
with force_locale('en'):
|
||||||
|
message = f'Audit: [{ip}] [{user.id}] ' + cls.get_message(log_type, user=user.username)
|
||||||
|
|
||||||
|
logger.info(message)
|
||||||
|
Loading…
Reference in New Issue
Block a user