forked from gergely/calendar-social
499 lines
14 KiB
Python
499 lines
14 KiB
Python
# Calendar.social
|
||
# Copyright (C) 2018 Gergely Polonkai
|
||
#
|
||
# This program is free software: you can redistribute it and/or modify
|
||
# it under the terms of the GNU Affero General Public License as published by
|
||
# the Free Software Foundation, either version 3 of the License, or
|
||
# (at your option) any later version.
|
||
#
|
||
# This program is distributed in the hope that it will be useful,
|
||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
# GNU Affero General Public License for more details.
|
||
#
|
||
# You should have received a copy of the GNU Affero General Public License
|
||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||
|
||
"""Main module for the Calendar.social app
|
||
"""
|
||
|
||
from datetime import datetime
|
||
import os
|
||
|
||
from flask import Flask, abort, current_app, redirect, render_template, request, url_for
|
||
from flask_babelex import Babel, get_locale as babel_get_locale
|
||
from flask_security import SQLAlchemyUserDatastore, current_user, login_required
|
||
from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound
|
||
|
||
|
||
def get_locale():
|
||
"""Locale selector
|
||
|
||
Selects the best locale based on values sent by the browser.
|
||
"""
|
||
|
||
supported_languages = ['en', 'hu']
|
||
|
||
if 'l' in request.args and request.args['l'].lower() in supported_languages:
|
||
return request.args['l'].lower()
|
||
|
||
return request.accept_languages.best_match(supported_languages)
|
||
|
||
|
||
def template_vars():
|
||
"""Function to inject global template variables
|
||
"""
|
||
|
||
now = datetime.utcnow()
|
||
|
||
return {
|
||
'lang': babel_get_locale().language,
|
||
'now': now,
|
||
'now_ts': now.timestamp(),
|
||
}
|
||
|
||
|
||
def route(*args, **kwargs):
|
||
"""Mark a function as a future route
|
||
|
||
Such functions will be iterated over when the application is initialised. ``*args`` and
|
||
``**kwargs`` will be passed verbatim to `Flask.route()`.
|
||
"""
|
||
|
||
def decorator(func): # pylint: disable=missing-docstring
|
||
setattr(func, 'routing', (args, kwargs))
|
||
|
||
return func
|
||
|
||
return decorator
|
||
|
||
|
||
class CalendarSocialApp(Flask):
|
||
"""The Calendar.social app
|
||
"""
|
||
|
||
def __init__(self, name, config=None): # pylint: disable=too-many-locals
|
||
from .forms import LoginForm
|
||
from .models import db, User, Role
|
||
from .security import security, AnonymousUser
|
||
|
||
Flask.__init__(self, name)
|
||
|
||
self._timezone = None
|
||
|
||
config_name = os.environ.get('ENV', config or 'dev')
|
||
self.config.from_pyfile(f'config_{config_name}.py', True)
|
||
# Make sure we look up users both by their usernames and email addresses
|
||
self.config['SECURITY_USER_IDENTITY_ATTRIBUTES'] = ('username', 'email')
|
||
self.config['SECURITY_LOGIN_USER_TEMPLATE'] = 'login.html'
|
||
|
||
self.jinja_env.policies['ext.i18n.trimmed'] = True # pylint: disable=no-member
|
||
|
||
db.init_app(self)
|
||
|
||
babel = Babel(app=self)
|
||
babel.localeselector(get_locale)
|
||
|
||
user_store = SQLAlchemyUserDatastore(db, User, Role)
|
||
security.init_app(self, datastore=user_store,
|
||
anonymous_user=AnonymousUser,
|
||
login_form=LoginForm)
|
||
|
||
self.context_processor(template_vars)
|
||
|
||
for attr_name in self.__dir__():
|
||
attr = getattr(self, attr_name)
|
||
|
||
if not callable(attr):
|
||
continue
|
||
|
||
args, kwargs = getattr(attr, 'routing', (None, None))
|
||
|
||
if args is None:
|
||
continue
|
||
|
||
self.route(*args, **kwargs)(attr)
|
||
|
||
self.before_request(self.goto_first_steps)
|
||
|
||
@staticmethod
|
||
def goto_first_steps():
|
||
"""Check if the current user has a profile and if not, redirect to the first steps page
|
||
"""
|
||
|
||
if current_user.is_authenticated and \
|
||
not current_user.profile and \
|
||
request.endpoint != 'first_steps':
|
||
return redirect(url_for('first_steps'))
|
||
|
||
return None
|
||
|
||
@property
|
||
def timezone(self):
|
||
"""The default time zone of the app
|
||
"""
|
||
|
||
from warnings import warn
|
||
|
||
from flask import has_app_context
|
||
from pytz import timezone, utc
|
||
from pytz.exceptions import UnknownTimeZoneError
|
||
|
||
if not has_app_context():
|
||
return utc
|
||
|
||
if not self._timezone:
|
||
timezone_str = current_app.config.get('DEFAULT_TIMEZONE', 'UTC')
|
||
|
||
try:
|
||
self._timezone = timezone(timezone_str)
|
||
except UnknownTimeZoneError:
|
||
warn(f'Timezone of {self} (or the default timezone) "{timezone_str}" is invalid')
|
||
|
||
self._timezone = utc
|
||
|
||
return self._timezone
|
||
|
||
@staticmethod
|
||
def _current_calendar():
|
||
from .calendar_system.gregorian import GregorianCalendar
|
||
|
||
try:
|
||
timestamp = datetime.fromtimestamp(float(request.args.get('date')))
|
||
except TypeError:
|
||
timestamp = datetime.utcnow()
|
||
|
||
return GregorianCalendar(timestamp.timestamp())
|
||
|
||
@route('/about')
|
||
def about(self):
|
||
"""View for the about page
|
||
"""
|
||
|
||
from .models import User, Event
|
||
|
||
calendar = self._current_calendar()
|
||
|
||
if not current_user.is_authenticated:
|
||
login_form_class = current_app.extensions['security'].login_form
|
||
login_form = login_form_class()
|
||
else:
|
||
login_form = None
|
||
|
||
user_count = User.query.count()
|
||
event_count = Event.query.count()
|
||
|
||
return render_template('welcome.html',
|
||
calendar=calendar,
|
||
user_only=False,
|
||
login_form=login_form,
|
||
user_count=user_count,
|
||
event_count=event_count)
|
||
|
||
@route('/')
|
||
def hello(self):
|
||
"""View for the main page
|
||
|
||
This will display a welcome message for users not logged in; for others, their main
|
||
calendar view is displayed.
|
||
"""
|
||
|
||
calendar = self._current_calendar()
|
||
|
||
if not current_user.is_authenticated:
|
||
return self.about()
|
||
|
||
return render_template('index.html', calendar=calendar, user_only=True)
|
||
|
||
@staticmethod
|
||
@route('/register', methods=['POST', 'GET'])
|
||
def register():
|
||
"""View for user registration
|
||
|
||
If the ``REGISTRATION_FAILED`` configuration value is set to ``True`` it displays the
|
||
registration disabled template. Otherwise, it performs user registration.
|
||
"""
|
||
|
||
if not current_app.config['REGISTRATION_ENABLED']:
|
||
return render_template('registration-disabled.html')
|
||
|
||
from .forms import RegistrationForm
|
||
from .models import db, User
|
||
|
||
form = RegistrationForm()
|
||
|
||
if form.validate_on_submit():
|
||
# TODO: This might become False later, if we want registrations to be confirmed via
|
||
# e-mail
|
||
user = User(active=True)
|
||
|
||
form.populate_obj(user)
|
||
db.session.add(user)
|
||
db.session.commit()
|
||
|
||
return redirect(url_for('hello'))
|
||
|
||
return render_template('registration.html', form=form)
|
||
|
||
@staticmethod
|
||
@route('/new-event', methods=['GET', 'POST'])
|
||
@login_required
|
||
def new_event():
|
||
"""View for creating a new event
|
||
|
||
This presents a form to the user that allows entering event details.
|
||
"""
|
||
|
||
from .forms import EventForm
|
||
from .models import db, Event
|
||
|
||
form = EventForm()
|
||
|
||
if form.validate_on_submit():
|
||
event = Event(profile=current_user.profile)
|
||
form.populate_obj(event)
|
||
|
||
db.session.add(event)
|
||
db.session.commit()
|
||
|
||
return redirect(url_for('hello'))
|
||
|
||
return render_template('event-edit.html', form=form)
|
||
|
||
@staticmethod
|
||
@route('/settings', methods=['GET', 'POST'])
|
||
@login_required
|
||
def settings():
|
||
"""View for user settings
|
||
"""
|
||
|
||
from .forms import SettingsForm
|
||
from .models import db
|
||
|
||
form = SettingsForm(current_user)
|
||
|
||
if form.validate_on_submit():
|
||
form.populate_obj(current_user)
|
||
|
||
db.session.commit()
|
||
|
||
return redirect(url_for('hello'))
|
||
|
||
return render_template('user-settings.html', form=form)
|
||
|
||
@staticmethod
|
||
@route('/event/<string:event_uuid>', methods=['GET', 'POST'])
|
||
def event_details(event_uuid):
|
||
"""View to display event details
|
||
"""
|
||
|
||
from .forms import InviteForm
|
||
from .models import db, Event
|
||
|
||
try:
|
||
event = Event.query.filter(Event.event_uuid == event_uuid).one()
|
||
except NoResultFound:
|
||
abort(404)
|
||
except MultipleResultsFound:
|
||
abort(500)
|
||
|
||
form = InviteForm(event)
|
||
|
||
if form.validate_on_submit():
|
||
event.invite(current_user.profile, invitee=form.invitee.data)
|
||
|
||
db.session.commit()
|
||
|
||
return redirect(url_for('event_details', event_uuid=event.event_uuid))
|
||
|
||
return render_template('event-details.html', event=event, form=form)
|
||
|
||
@staticmethod
|
||
@route('/profile/@<string:username>')
|
||
def display_profile(username):
|
||
"""View to display profile details
|
||
"""
|
||
|
||
from .models import Profile, User
|
||
|
||
try:
|
||
profile = Profile.query.join(User).filter(User.username == username).one()
|
||
except NoResultFound:
|
||
abort(404)
|
||
|
||
return render_template('profile-details.html', profile=profile)
|
||
|
||
@staticmethod
|
||
@route('/profile/@<string:username>/follow')
|
||
@login_required
|
||
def follow_user(username):
|
||
"""View for following a user
|
||
"""
|
||
|
||
from .models import db, Profile, User
|
||
|
||
try:
|
||
profile = Profile.query.join(User).filter(User.username == username).one()
|
||
except NoResultFound:
|
||
abort(404)
|
||
|
||
if profile.user != current_user:
|
||
profile.follow(follower=current_user.profile)
|
||
|
||
db.session.commit()
|
||
|
||
return redirect(url_for('display_profile', username=username))
|
||
|
||
@staticmethod
|
||
@route('/notifications')
|
||
def notifications():
|
||
"""View to list the notifications for the current user
|
||
"""
|
||
|
||
from .models import Notification
|
||
|
||
if current_user.is_authenticated:
|
||
notifs = Notification.query.filter(Notification.profile == current_user.profile)
|
||
else:
|
||
notifs = []
|
||
|
||
return render_template('notifications.html', notifs=notifs)
|
||
|
||
@staticmethod
|
||
@route('/accept/<int:invite_id>')
|
||
def accept_invite(invite_id):
|
||
"""View to accept an invitation
|
||
"""
|
||
|
||
from .models import db, Invitation, Response, ResponseType
|
||
|
||
invitation = Invitation.query.get_or_404(invite_id)
|
||
|
||
if invitation.invitee != current_user.profile:
|
||
abort(403)
|
||
|
||
try:
|
||
Response.query \
|
||
.filter(Response.event == invitation.event) \
|
||
.filter(Response.profile == current_user.profile) \
|
||
.one()
|
||
except NoResultFound:
|
||
response = Response(profile=current_user.profile,
|
||
event=invitation.event,
|
||
invitation=invitation,
|
||
response=ResponseType.going)
|
||
db.session.add(response)
|
||
db.session.commit()
|
||
except MultipleResultsFound:
|
||
pass
|
||
|
||
return redirect(url_for('event_details', event_uuid=invitation.event.event_uuid))
|
||
|
||
@staticmethod
|
||
@route('/first-steps', methods=['GET', 'POST'])
|
||
@login_required
|
||
def first_steps():
|
||
"""View to set up a new registrant’s profile
|
||
"""
|
||
|
||
from .forms import FirstStepsForm
|
||
from .models import db, Profile
|
||
|
||
if current_user.profile:
|
||
return redirect(url_for('hello'))
|
||
|
||
form = FirstStepsForm()
|
||
|
||
if form.validate_on_submit():
|
||
profile = Profile(user=current_user, display_name=form.display_name.data)
|
||
db.session.add(profile)
|
||
|
||
current_user.settings['timezone'] = str(form.time_zone.data)
|
||
|
||
db.session.commit()
|
||
|
||
return redirect(url_for('hello'))
|
||
|
||
return render_template('first-steps.html', form=form)
|
||
|
||
@staticmethod
|
||
@route('/edit-profile', methods=['GET', 'POST'])
|
||
@login_required
|
||
def edit_profile():
|
||
"""View for editing one’s profile
|
||
"""
|
||
|
||
from .forms import ProfileForm
|
||
from .models import db
|
||
|
||
form = ProfileForm(current_user.profile)
|
||
|
||
if form.validate_on_submit():
|
||
form.populate_obj(current_user.profile)
|
||
db.session.add(current_user.profile)
|
||
db.session.commit()
|
||
|
||
return redirect(url_for('edit_profile'))
|
||
|
||
return render_template('profile-edit.html', form=form)
|
||
|
||
@staticmethod
|
||
@route('/all-events')
|
||
def all_events():
|
||
"""View for listing all available events
|
||
"""
|
||
|
||
from .calendar_system.gregorian import GregorianCalendar
|
||
|
||
try:
|
||
timestamp = datetime.fromtimestamp(float(request.args.get('date')))
|
||
except TypeError:
|
||
timestamp = datetime.utcnow()
|
||
|
||
calendar = GregorianCalendar(timestamp.timestamp())
|
||
|
||
return render_template('index.html', calendar=calendar, user_only=False)
|
||
|
||
@staticmethod
|
||
@route('/follow-requests')
|
||
@login_required
|
||
def follow_requests():
|
||
"""View for listing follow requests
|
||
"""
|
||
|
||
from .models import UserFollow
|
||
|
||
requests = UserFollow.query \
|
||
.filter(UserFollow.followed == current_user.profile) \
|
||
.filter(UserFollow.accepted_at.is_(None))
|
||
|
||
return render_template('follow-requests.html', requests=requests)
|
||
|
||
@staticmethod
|
||
@route('/follow-request/<int:follower_id>/accept')
|
||
@login_required
|
||
def accept_follow(follower_id):
|
||
"""View for accepting a follow request
|
||
"""
|
||
|
||
from .models import db, UserFollow
|
||
|
||
try:
|
||
req = UserFollow.query \
|
||
.filter(UserFollow.followed == current_user.profile) \
|
||
.filter(UserFollow.follower_id == follower_id) \
|
||
.one()
|
||
except NoResultFound:
|
||
abort(404)
|
||
|
||
if req.accepted_at is None:
|
||
req.accept()
|
||
|
||
db.session.add(req)
|
||
db.session.commit()
|
||
|
||
return redirect(url_for('follow_requests'))
|
||
|
||
|
||
app = CalendarSocialApp(__name__)
|