Compare commits

..

4 Commits

Author SHA1 Message Date
168b2b288a Added mysql-related database variables
Added options for different database engines
2022-08-20 12:01:08 +01:00
9a5f69f889 Added database-related env variables 2022-08-20 11:59:33 +01:00
7d6f256392 Added PyMySQL driver dependency 2022-08-20 11:59:02 +01:00
866c9b10cf Exception handling for database queries 2022-08-20 10:56:43 +01:00
18 changed files with 260 additions and 117 deletions

View File

@ -3,9 +3,18 @@ FLASK_DEBUG=False
TZ=Europe/London # Time Zone TZ=Europe/London # Time Zone
## Flask Configuration ## App Configuration
SECRET_KEY= # Long, secure, secret string. SECRET_KEY= # Long, secure, secret string.
DATA=./data/ DATA=./data/
DATABASE_TYPE=SQLite # SQLite or MySQL, defaults to SQLite
DATABASE_HOST= # Required if MySQL. Must match name of Docker service, or provide host if database is on an external server. Defaults to localhost.
DATABASE_PORT= # Required if MySQL. Defaults to 3306
## MySQL Database Configuration (Required if configured to MySQL Database.)
# Note that if using the Docker service, these configuration values will also be used when creating the database in the mysql container.
MYSQL_DATABASE= # Required if MySQL.
MYSQL_USER= # Required if MySQL
MYSQL_PASSWORD= # Required if MySQL. Create secure password string. Note '@' character cannot be used.
## Flask Mail Configuration ## Flask Mail Configuration
MAIL_SERVER=postfix # Must match name of the Docker service MAIL_SERVER=postfix # Must match name of the Docker service

View File

@ -1,11 +1,13 @@
from .config import Production as Config from .config import Production as Config
from .models import User from .models import User
from .extensions import bootstrap, csrf, db, login_manager, mail from .extensions import bootstrap, csrf, db, login_manager, mail
from .tools.logs import write
from flask import flash, Flask, render_template, request from flask import flash, Flask, render_template, request
from flask.helpers import url_for from flask.helpers import abort, url_for
from flask.json import jsonify from flask.json import jsonify
from flask_wtf.csrf import CSRFError from flask_wtf.csrf import CSRFError
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.middleware.proxy_fix import ProxyFix
from datetime import datetime from datetime import datetime
@ -24,7 +26,10 @@ def create_app():
login_manager.login_view = 'admin._login' login_manager.login_view = 'admin._login'
@login_manager.user_loader @login_manager.user_loader
def _load_user(id): def _load_user(id):
return User.query.filter_by(id=id).first() try: return User.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when loading user fo login manager: {exception}')
return abort(500)
@app.before_request @app.before_request
def _check_cookie_consent(): def _check_cookie_consent():

View File

@ -1,13 +1,15 @@
from ..forms.admin import AddTimeAdjustment, CreateTest, CreateUser, DeleteUser, Login, Register, ResetPassword, UpdatePassword, UpdateUser, UploadData from ..forms.admin import AddTimeAdjustment, CreateTest, CreateUser, DeleteUser, Login, Register, ResetPassword, UpdatePassword, UpdateUser, UploadData
from ..models import Dataset, Entry, Test, User from ..models import Dataset, Entry, Test, User
from ..tools.auth import disable_if_logged_in, require_account_creation from ..tools.auth import disable_if_logged_in, require_account_creation
from ..tools.forms import get_dataset_choices, get_time_options, send_errors_to_client
from ..tools.data import check_dataset_exists, check_is_json, validate_json from ..tools.data import check_dataset_exists, check_is_json, validate_json
from ..tools.forms import get_dataset_choices, get_time_options, send_errors_to_client
from ..tools.logs import write
from ..tools.test import answer_options, get_correct_answers from ..tools.test import answer_options, get_correct_answers
from flask import abort, Blueprint, jsonify, render_template, redirect, request, send_file, session from flask import abort, Blueprint, jsonify, render_template, request, send_file, session
from flask.helpers import flash, url_for from flask.helpers import abort, flash, redirect, url_for
from flask_login import current_user, login_required from flask_login import current_user, login_required
from sqlalchemy.exc import SQLAlchemyError
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
from json import loads from json import loads
@ -26,8 +28,12 @@ admin = Blueprint(
@admin.route('/dashboard/') @admin.route('/dashboard/')
@login_required @login_required
def _home(): def _home():
tests = Test.query.all() try:
results = Entry.query.all() tests = Test.query.all()
results = Entry.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
current_tests = [ test for test in tests if test.end_date >= datetime.now() and test.start_date.date() <= date.today() ] current_tests = [ test for test in tests if test.end_date >= datetime.now() and test.start_date.date() <= date.today() ]
current_tests.sort(key= lambda x: x.end_date, reverse=True) current_tests.sort(key= lambda x: x.end_date, reverse=True)
upcoming_tests = [ test for test in tests if test.start_date.date() > datetime.now().date()] upcoming_tests = [ test for test in tests if test.start_date.date() > datetime.now().date()]
@ -39,8 +45,12 @@ def _home():
@admin.route('/settings/') @admin.route('/settings/')
@login_required @login_required
def _settings(): def _settings():
users = User.query.all() try:
datasets = Dataset.query.all() users = User.query.all()
datasets = Dataset.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/admin/settings/index.html', users=users, datasets=datasets) return render_template('/admin/settings/index.html', users=users, datasets=datasets)
@admin.route('/login/', methods=['GET','POST']) @admin.route('/login/', methods=['GET','POST'])
@ -50,7 +60,10 @@ def _login():
form = Login() form = Login()
if request.method == 'POST': if request.method == 'POST':
if form.validate_on_submit(): if form.validate_on_submit():
users = User.query.all() try: users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
user = None user = None
for _user in users: for _user in users:
if _user.get_username() == request.form.get('username').lower(): if _user.get_username() == request.form.get('username').lower():
@ -99,7 +112,10 @@ def _reset():
if request.method == 'POST': if request.method == 'POST':
if form.validate_on_submit(): if form.validate_on_submit():
user = None user = None
users = User.query.all() try: users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
for _user in users: for _user in users:
if _user.get_username() == request.form.get('username'): if _user.get_username() == request.form.get('username'):
user = _user user = _user
@ -111,7 +127,10 @@ def _reset():
token = request.args.get('token') token = request.args.get('token')
if token: if token:
user = User.query.filter_by(reset_token=token).first() try: user = User.query.filter_by(reset_token=token).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not user: return redirect(url_for('admin._reset')) if not user: return redirect(url_for('admin._reset'))
verification_token = user.verification_token verification_token = user.verification_token
user.clear_reset_tokens() user.clear_reset_tokens()
@ -128,7 +147,10 @@ def _update_password():
form = UpdatePassword() form = UpdatePassword()
if form.validate_on_submit(): if form.validate_on_submit():
user = session.pop('user') user = session.pop('user')
user = User.query.filter_by(id=user).first() try: user = User.query.filter_by(id=user).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
user.update(password=request.form.get('password')) user.update(password=request.form.get('password'))
session['remembered_username'] = user.get_username() session['remembered_username'] = user.get_username()
flash('Your password has been reset.', 'success') flash('Your password has been reset.', 'success')
@ -139,7 +161,10 @@ def _update_password():
@login_required @login_required
def _users(): def _users():
form = CreateUser() form = CreateUser()
users = User.query.all() try: users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if request.method == 'POST': if request.method == 'POST':
if form.validate_on_submit(): if form.validate_on_submit():
password = request.form.get('password') password = request.form.get('password')
@ -156,7 +181,10 @@ def _users():
@admin.route('/settings/users/delete/<string:id>', methods=['GET', 'POST']) @admin.route('/settings/users/delete/<string:id>', methods=['GET', 'POST'])
@login_required @login_required
def _delete_user(id:str): def _delete_user(id:str):
user = User.query.filter_by(id=id).first() try: user = User.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
form = DeleteUser() form = DeleteUser()
if request.method == 'POST': if request.method == 'POST':
if not user: return jsonify({'error': 'User does not exist.'}), 400 if not user: return jsonify({'error': 'User does not exist.'}), 400
@ -180,7 +208,10 @@ def _delete_user(id:str):
@admin.route('/settings/users/update/<string:id>', methods=['GET', 'POST']) @admin.route('/settings/users/update/<string:id>', methods=['GET', 'POST'])
@login_required @login_required
def _update_user(id:str): def _update_user(id:str):
user = User.query.filter_by(id=id).first() try: user = User.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
form = UpdateUser() form = UpdateUser()
if request.method == 'POST': if request.method == 'POST':
if not user: return jsonify({'error': 'User does not exist.'}), 400 if not user: return jsonify({'error': 'User does not exist.'}), 400
@ -222,7 +253,10 @@ def _questions():
return jsonify({'error': message}), 400 return jsonify({'error': message}), 400
return send_errors_to_client(form=form) return send_errors_to_client(form=form)
data = Dataset.query.all() try: data = Dataset.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/admin/settings/questions.html', form=form, data=data) return render_template('/admin/settings/questions.html', form=form, data=data)
@admin.route('/settings/questions/delete/', methods=['POST']) @admin.route('/settings/questions/delete/', methods=['POST'])
@ -231,7 +265,10 @@ def _edit_questions():
id = request.get_json()['id'] id = request.get_json()['id']
action = request.get_json()['action'] action = request.get_json()['action']
if not action == 'delete': return jsonify({'error': 'Invalid action.'}), 400 if not action == 'delete': return jsonify({'error': 'Invalid action.'}), 400
dataset = Dataset.query.filter_by(id=id).first() try: dataset = Dataset.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if action == 'delete': success, message = dataset.delete() if action == 'delete': success, message = dataset.delete()
if success: return jsonify({'success': message}), 200 if success: return jsonify({'success': message}), 200
return jsonify({'error': message}), 400 return jsonify({'error': message}), 400
@ -239,7 +276,10 @@ def _edit_questions():
@admin.route('/settings/questions/download/<string:id>/') @admin.route('/settings/questions/download/<string:id>/')
@login_required @login_required
def _download(id:str): def _download(id:str):
dataset = Dataset.query.filter_by(id=id).first() try: dataset = Dataset.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset: return abort(404) if not dataset: return abort(404)
data_path = path.abspath(dataset.get_file()) data_path = path.abspath(dataset.get_file())
return send_file(data_path, as_attachment=True, attachment_filename=f'{dataset.get_name()}.json') return send_file(data_path, as_attachment=True, attachment_filename=f'{dataset.get_name()}.json')
@ -250,7 +290,10 @@ def _download(id:str):
@check_dataset_exists @check_dataset_exists
def _tests(filter:str=None): def _tests(filter:str=None):
tests = None tests = None
_tests = Test.query.all() try: _tests = Test.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
form = None form = None
now = datetime.now() now = datetime.now()
if filter not in ['create','active','scheduled','expired','all']: return redirect(url_for('admin._tests', filter='active')) if filter not in ['create','active','scheduled','expired','all']: return redirect(url_for('admin._tests', filter='active'))
@ -296,7 +339,10 @@ def _create_test():
new_test.end_date = datetime.strptime(new_test.end_date, '%Y-%m-%dT%H:%M') new_test.end_date = datetime.strptime(new_test.end_date, '%Y-%m-%dT%H:%M')
new_test.time_limit = None if request.form.get('time_limit') == 'none' else int(request.form.get('time_limit')) new_test.time_limit = None if request.form.get('time_limit') == 'none' else int(request.form.get('time_limit'))
dataset = request.form.get('dataset') dataset = request.form.get('dataset')
new_test.dataset = Dataset.query.filter_by(id=dataset).first() try: new_test.dataset = Dataset.query.filter_by(id=dataset).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
success, message = new_test.create() success, message = new_test.create()
if success: if success:
flash(message=message, category='success') flash(message=message, category='success')
@ -310,7 +356,10 @@ def _edit_test():
id = request.get_json()['id'] id = request.get_json()['id']
action = request.get_json()['action'] action = request.get_json()['action']
if action not in ['start', 'delete', 'end']: return jsonify({'error': 'Invalid action.'}), 400 if action not in ['start', 'delete', 'end']: return jsonify({'error': 'Invalid action.'}), 400
test = Test.query.filter_by(id=id).first() try: test = Test.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not test: return jsonify({'error': 'Could not find the corresponding test to delete.'}), 404 if not test: return jsonify({'error': 'Could not find the corresponding test to delete.'}), 404
if action == 'delete': success, message = test.delete() if action == 'delete': success, message = test.delete()
if action == 'start': success, message = test.start() if action == 'start': success, message = test.start()
@ -324,7 +373,10 @@ def _edit_test():
@login_required @login_required
def _view_test(id:str=None): def _view_test(id:str=None):
form = AddTimeAdjustment() form = AddTimeAdjustment()
test = Test.query.filter_by(id=id).first() try: test = Test.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if request.method == 'POST': if request.method == 'POST':
if not test: return jsonify({'error': 'Invalid test ID.'}), 404 if not test: return jsonify({'error': 'Invalid test ID.'}), 404
if form.validate_on_submit(): if form.validate_on_submit():
@ -341,7 +393,10 @@ def _view_test(id:str=None):
@admin.route('/test/<string:id>/delete-adjustment/', methods=['POST']) @admin.route('/test/<string:id>/delete-adjustment/', methods=['POST'])
@login_required @login_required
def _delete_adjustment(id:str=None): def _delete_adjustment(id:str=None):
test = Test.query.filter_by(id=id).first() try: test = Test.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not test: return jsonify({'error': 'Invalid test ID.'}), 404 if not test: return jsonify({'error': 'Invalid test ID.'}), 404
user_code = request.get_json()['user_code'].lower() user_code = request.get_json()['user_code'].lower()
success, message = test.remove_adjustment(user_code) success, message = test.remove_adjustment(user_code)
@ -351,13 +406,19 @@ def _delete_adjustment(id:str=None):
@admin.route('/results/') @admin.route('/results/')
@login_required @login_required
def _view_entries(): def _view_entries():
entries = Entry.query.all() try: entries = Entry.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/admin/results.html', entries = entries) return render_template('/admin/results.html', entries = entries)
@admin.route('/results/<string:id>/', methods = ['GET', 'POST']) @admin.route('/results/<string:id>/', methods = ['GET', 'POST'])
@login_required @login_required
def _view_entry(id:str=None): def _view_entry(id:str=None):
entry = Entry.query.filter_by(id=id).first() try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if request.method == 'POST': if request.method == 'POST':
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404 if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404
action = request.get_json()['action'] action = request.get_json()['action']
@ -388,6 +449,9 @@ def _view_entry(id:str=None):
def _generate_certificate(): def _generate_certificate():
from ..extensions import db from ..extensions import db
id = request.get_json()['id'] id = request.get_json()['id']
entry = Entry.query.filter_by(id=id).first() try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404 if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404
return render_template('/admin/components/certificate.html', entry = entry) return render_template('/admin/components/certificate.html', entry = entry)

View File

@ -1,9 +1,12 @@
from ..models import Dataset, Entry, User from ..models import Dataset, Entry, User
from ..tools.data import validate_json from ..tools.data import validate_json
from ..tools.logs import write
from ..tools.test import evaluate_answers, generate_questions from ..tools.test import evaluate_answers, generate_questions
from flask import Blueprint, flash, jsonify, request, url_for from flask import Blueprint, jsonify, request
from flask.helpers import abort, flash, url_for
from flask_login import login_required from flask_login import login_required
from sqlalchemy.exc import SQLAlchemyError
from datetime import datetime, timedelta from datetime import datetime, timedelta
from json import loads from json import loads
@ -16,7 +19,10 @@ api = Blueprint(
@api.route('/questions/', methods=['POST']) @api.route('/questions/', methods=['POST'])
def _fetch_questions(): def _fetch_questions():
id = request.get_json()['id'] id = request.get_json()['id']
entry = Entry.query.filter_by(id=id).first() try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 400 if not entry: return jsonify({'error': 'Invalid entry ID.'}), 400
test = entry.test test = entry.test
user_code = entry.user_code user_code = entry.user_code
@ -50,7 +56,10 @@ def _fetch_questions():
def _submit_quiz(): def _submit_quiz():
id = request.get_json()['id'] id = request.get_json()['id']
answers = request.get_json()['answers'] answers = request.get_json()['answers']
entry = Entry.query.filter_by(id=id).first() try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not entry: return jsonify({'error': 'Unrecognised Entry.'}), 400 if not entry: return jsonify({'error': 'Unrecognised Entry.'}), 400
test = entry.test test = entry.test
dataset = test.dataset dataset = test.dataset
@ -71,7 +80,12 @@ def _submit_quiz():
def _editor(id:str=None): def _editor(id:str=None):
request_data = request.get_json() request_data = request.get_json()
id = request_data['id'] id = request_data['id']
dataset = Dataset.query.filter_by(id=id).first() try:
dataset = Dataset.query.filter_by(id=id).first()
user = User.query.filter_by(id=creator).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset: return jsonify({'error': 'Invalid request. Dataset not found.'}), 404 if not dataset: return jsonify({'error': 'Invalid request. Dataset not found.'}), 404
data_path = dataset.get_file() data_path = dataset.get_file()
if request_data['action'] == 'fetch': if request_data['action'] == 'fetch':
@ -83,7 +97,6 @@ def _editor(id:str=None):
name = request_data['name'] name = request_data['name']
data = request_data['data'] data = request_data['data']
if not validate_json(data): return jsonify({'error': 'The data you submitted was invalid.'}), 400 if not validate_json(data): return jsonify({'error': 'The data you submitted was invalid.'}), 400
user = User.query.filter_by(id=creator).first()
dataset.set_name(name) dataset.set_name(name)
dataset.creator = user dataset.creator = user
success, message = dataset.update(data=data, default=default) success, message = dataset.update(data=data, default=default)

View File

@ -4,6 +4,7 @@ from dotenv import load_dotenv
load_dotenv('../.env') load_dotenv('../.env')
class Config(object): class Config(object):
"""Basic App Configuration"""
APP_HOST = '0.0.0.0' APP_HOST = '0.0.0.0'
DATA = './data/' DATA = './data/'
DEBUG = False DEBUG = False
@ -11,9 +12,8 @@ class Config(object):
SECRET_KEY = os.getenv('SECRET_KEY') SECRET_KEY = os.getenv('SECRET_KEY')
SERVER_NAME = os.getenv('SERVER_NAME') SERVER_NAME = os.getenv('SERVER_NAME')
SESSION_COOKIE_SECURE = True SESSION_COOKIE_SECURE = True
SQLALCHEMY_DATABASE_URI = f'sqlite:///{Path(os.path.abspath(f"{DATA}/database.db"))}'
SQLALCHEMY_TRACK_MODIFICATIONS = False
"""Email Engine Configuration"""
MAIL_SERVER = os.getenv('MAIL_SERVER') MAIL_SERVER = os.getenv('MAIL_SERVER')
MAIL_PORT = int(os.getenv('MAIL_PORT') or 25) MAIL_PORT = int(os.getenv('MAIL_PORT') or 25)
MAIL_USE_TLS = False MAIL_USE_TLS = False
@ -26,6 +26,19 @@ class Config(object):
MAIL_SUPPRESS_SEND = False MAIL_SUPPRESS_SEND = False
MAIL_ASCII_ATTACHMENTS = bool(os.getenv('MAIL_ASCII_ATTACHMENTS') or True) MAIL_ASCII_ATTACHMENTS = bool(os.getenv('MAIL_ASCII_ATTACHMENTS') or True)
"""Database Driver Configuration"""
DATABASE_TYPE = os.getenv('DATABASE_TYPE') or 'SQLite'
SQLALCHEMY_TRACK_MODIFICATIONS = False
if DATABASE_TYPE.lower() == 'mysql' and os.getenv('MYSQL_DATABASE') and os.getenv('MYSQL_USER') and os.getenv('MYSQL_PASSWORD'):
DATABASE_HOST = os.getenv('DATABASE_HOST') or 'localhost'
DATABASE_PORT = int(os.getenv('DATABASE_PORT') or 3306)
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE')
MYSQL_USER = os.getenv('MYSQL_USER')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD')
SQLALCHEMY_DATABASE_URI = f'mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{DATABASE_HOST}:{DATABASE_PORT}/{MYSQL_DATABASE}'
else: SQLALCHEMY_DATABASE_URI = f'sqlite:///{Path(os.path.abspath(f"{DATA}/database.db"))}'
class Production(Config): class Production(Config):
pass pass

View File

@ -1,11 +1,13 @@
from ..forms.admin import EditDataset from ..forms.admin import EditDataset
from ..models import Dataset, User from ..models import Dataset, User
from ..tools.forms import get_dataset_choices, send_errors_to_client
from ..tools.data import check_dataset_exists from ..tools.data import check_dataset_exists
from ..tools.forms import get_dataset_choices, send_errors_to_client
from ..tools.logs import write
from flask import Blueprint, flash, jsonify, redirect, render_template, request from flask import Blueprint, jsonify, render_template
from flask.helpers import url_for from flask.helpers import abort, flash, redirect, request, url_for
from flask_login import login_required from flask_login import login_required
from sqlalchemy.exc import SQLAlchemyError
editor = Blueprint( editor = Blueprint(
name='editor', name='editor',
@ -31,9 +33,13 @@ def _editor():
@check_dataset_exists @check_dataset_exists
@login_required @login_required
def _editor_console(id:str=None): def _editor_console(id:str=None):
dataset = Dataset.query.filter_by(id=id).first() try:
datasets = Dataset.query.count() dataset = Dataset.query.filter_by(id=id).first()
users = User.query.all() datasets = Dataset.query.count()
users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset: if not dataset:
flash('Invalid dataset ID.', 'error') flash('Invalid dataset ID.', 'error')
return redirect(url_for('admin._questions')) return redirect(url_for('admin._questions'))

View File

@ -2,8 +2,8 @@ from ..extensions import db
from ..tools.encryption import decrypt, encrypt from ..tools.encryption import decrypt, encrypt
from ..tools.logs import write from ..tools.logs import write
from flask import flash
from flask import current_app as app from flask import current_app as app
from flask.helpers import flash
from flask_login import current_user from flask_login import current_user
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
@ -43,11 +43,13 @@ class Dataset(db.Model):
def get_name(self): return decrypt(self.name) def get_name(self): return decrypt(self.name)
def make_default(self): def make_default(self):
for dataset in Dataset.query.all():
dataset.default = False
self.default = True
try: try:
db.session.commit() for dataset in Dataset.query.all(): dataset.default = False
except SQLAlchemyError as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
self.default = True
try: db.session.commit()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
db.session.rollback() db.session.rollback()
write('system.log', f'Database error when setting default dataset {self.id}: {exception}') write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
@ -61,10 +63,14 @@ class Dataset(db.Model):
message = 'Cannot delete the default dataset.' message = 'Cannot delete the default dataset.'
flash(message, 'error') flash(message, 'error')
return False, message return False, message
if Dataset.query.count() == 1: try:
message = 'Cannot delete the only dataset.' if Dataset.query.count() == 1:
flash(message, 'error') message = 'Cannot delete the only dataset.'
return False, message flash(message, 'error')
return False, message
except SQLAlchemyError as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
write('system.log', f'Dataset {self.id} deleted by {current_user.get_username()}.') write('system.log', f'Dataset {self.id} deleted by {current_user.get_username()}.')
filename = secure_filename('.'.join([self.id,'json'])) filename = secure_filename('.'.join([self.id,'json']))
data = Path(app.config.get('DATA')) data = Path(app.config.get('DATA'))

View File

@ -84,8 +84,7 @@ class Entry(db.Model):
def start(self): def start(self):
self.start_time = datetime.now() self.start_time = datetime.now()
self.status = 'started' self.status = 'started'
try: try: db.session.commit()
db.session.commit()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
db.session.rollback() db.session.rollback()
write('system.log', f'Database error when starting test for {self.get_surname()}, {self.get_first_name()}: {exception}') write('system.log', f'Database error when starting test for {self.get_surname()}, {self.get_first_name()}: {exception}')
@ -104,8 +103,7 @@ class Entry(db.Model):
else: else:
self.status = 'late' self.status = 'late'
self.valid = False self.valid = False
try: try: db.session.commit()
db.session.commit()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
db.session.rollback() db.session.rollback()
write('system.log', f'Database error when submitting entry for {self.get_surname()}, {self.get_first_name()}: {exception}') write('system.log', f'Database error when submitting entry for {self.get_surname()}, {self.get_first_name()}: {exception}')
@ -118,8 +116,7 @@ class Entry(db.Model):
if self.status == 'started': return False, 'The entry is still pending.' if self.status == 'started': return False, 'The entry is still pending.'
self.valid = True self.valid = True
self.status = 'completed' self.status = 'completed'
try: try: db.session.commit()
db.session.commit()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
db.session.rollback() db.session.rollback()
write('system.log', f'Database error when validating entry {self.id}: {exception}') write('system.log', f'Database error when validating entry {self.id}: {exception}')
@ -201,7 +198,6 @@ class Entry(db.Model):
<p>Best wishes, <br/> SKA Refereeing</p> <p>Best wishes, <br/> SKA Refereeing</p>
""" """
) )
try: try: mail.send(email)
mail.send(email)
except SMTPException as exception: except SMTPException as exception:
write('system.log', f'SMTP Error when trying to notify results to {self.get_surname()}, {self.get_first_name()} with error: {exception}') write('system.log', f'SMTP Error when trying to notify results to {self.get_surname()}, {self.get_first_name()} with error: {exception}')

View File

@ -66,8 +66,7 @@ class Test(db.Model):
def delete(self): def delete(self):
if self.entries: return False, f'Cannot delete a test with submitted entries.' if self.entries: return False, f'Cannot delete a test with submitted entries.'
db.session.delete(self) db.session.delete(self)
try: try: db.session.commit()
db.session.commit()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
db.session.rollback() db.session.rollback()
write('system.log', f'Database error when deleting test {self.get_code()}: {exception}') write('system.log', f'Database error when deleting test {self.get_code()}: {exception}')
@ -79,8 +78,7 @@ class Test(db.Model):
now = datetime.now() now = datetime.now()
if self.start_date.date() > now.date(): if self.start_date.date() > now.date():
self.start_date = now self.start_date = now
try: try: db.session.commit()
db.session.commit()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
db.session.rollback() db.session.rollback()
write('system.log', f'Database error when launching test {self.get_code()}: {exception}') write('system.log', f'Database error when launching test {self.get_code()}: {exception}')
@ -93,8 +91,7 @@ class Test(db.Model):
now = datetime.now() now = datetime.now()
if self.end_date >= now: if self.end_date >= now:
self.end_date = now self.end_date = now
try: try: db.session.commit()
db.session.commit()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
db.session.rollback() db.session.rollback()
write('system.log', f'Database error when closing test {self.get_code()}: {exception}') write('system.log', f'Database error when closing test {self.get_code()}: {exception}')
@ -108,8 +105,7 @@ class Test(db.Model):
code = secrets.token_hex(3).lower() code = secrets.token_hex(3).lower()
adjustments[code] = time adjustments[code] = time
self.adjustments = adjustments self.adjustments = adjustments
try: try: db.session.commit()
db.session.commit()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
db.session.rollback() db.session.rollback()
write('system.log', f'Database error when adding adjustment to test {self.get_code()}: {exception}') write('system.log', f'Database error when adding adjustment to test {self.get_code()}: {exception}')
@ -121,8 +117,7 @@ class Test(db.Model):
if not self.adjustments: return False, f'There are no adjustments configured for test {self.get_code()}.' if not self.adjustments: return False, f'There are no adjustments configured for test {self.get_code()}.'
self.adjustments.pop(code) self.adjustments.pop(code)
if not self.adjustments: self.adjustments = None if not self.adjustments: self.adjustments = None
try: try: db.session.commit()
db.session.commit()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
db.session.rollback() db.session.rollback()
write('system.log', f'Database error when deleting adjustment from test {self.get_code()}: {exception}') write('system.log', f'Database error when deleting adjustment from test {self.get_code()}: {exception}')
@ -135,8 +130,7 @@ class Test(db.Model):
if start_date: self.start_date = start_date if start_date: self.start_date = start_date
if end_date: self.end_date = end_date if end_date: self.end_date = end_date
if time_limit is not None: self.time_limit = time_limit if time_limit is not None: self.time_limit = time_limit
try: try: db.session.commit()
db.session.commit()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
db.session.rollback() db.session.rollback()
write('system.log', f'Database error when updating test {self.get_code()}: {exception}') write('system.log', f'Database error when updating test {self.get_code()}: {exception}')

View File

@ -2,8 +2,8 @@ from ..extensions import db, mail
from ..tools.encryption import decrypt, encrypt from ..tools.encryption import decrypt, encrypt
from ..tools.logs import write from ..tools.logs import write
from flask import flash, jsonify, session from flask import jsonify, session
from flask.helpers import url_for from flask.helpers import flash, url_for
from flask_login import current_user, login_user, logout_user, UserMixin from flask_login import current_user, login_user, logout_user, UserMixin
from flask_mail import Message from flask_mail import Message
from smtplib import SMTPException from smtplib import SMTPException
@ -57,7 +57,10 @@ class User(UserMixin, db.Model):
def register(self, notify:bool=False, password:str=None): def register(self, notify:bool=False, password:str=None):
self.generate_id() self.generate_id()
users = User.query.all() try: users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
for user in users: for user in users:
if user.get_username() == self.get_username(): return False, f'Username {self.get_username()} already in use.' if user.get_username() == self.get_username(): return False, f'Username {self.get_username()} already in use.'
if user.get_email() == self.get_email(): return False, f'Email address {self.get_email()} already in use.' if user.get_email() == self.get_email(): return False, f'Email address {self.get_email()} already in use.'
@ -97,8 +100,7 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p> <p>SKA Refereeing</p>
""" """
) )
try: try: mail.send(email)
mail.send(email)
except SMTPException as exception: except SMTPException as exception:
write('system.log', f'SMTP Error while trying to notify new user account creation to {self.get_username()} with error: {exception}') write('system.log', f'SMTP Error while trying to notify new user account creation to {self.get_username()} with error: {exception}')
return True, f'User {self.get_username()} was created successfully.' return True, f'User {self.get_username()} was created successfully.'
@ -151,14 +153,12 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p> <p>SKA Refereeing</p>
""" """
) )
try: try: mail.send(email)
mail.send(email)
except SMTPException as exception: except SMTPException as exception:
write('system.log', f'SMTP Error while trying to reset password for {self.get_username()} with error: {exception}') write('system.log', f'SMTP Error while trying to reset password for {self.get_username()} with error: {exception}')
db.session.rollback() db.session.rollback()
return jsonify({'error': f'SMTP Error: {exception}'}), 500 return jsonify({'error': f'SMTP Error: {exception}'}), 500
try: try: db.session.commit()
db.session.commit()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
db.session.rollback() db.session.rollback()
write('system.log', f'Database error when resetting password for user {self.get_username()}: {exception}') write('system.log', f'Database error when resetting password for user {self.get_username()}: {exception}')
@ -167,8 +167,7 @@ class User(UserMixin, db.Model):
def clear_reset_tokens(self): def clear_reset_tokens(self):
self.reset_token = self.verification_token = None self.reset_token = self.verification_token = None
try: try: db.session.commit()
db.session.commit()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
db.session.rollback() db.session.rollback()
write('system.log', f'Database error when resetting clearing reset tokens for user {self.get_username()}: {exception}') write('system.log', f'Database error when resetting clearing reset tokens for user {self.get_username()}: {exception}')
@ -208,8 +207,7 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p> <p>SKA Refereeing</p>
""" """
) )
try: try: mail.send(email)
mail.send(email)
except SMTPException as exception: except SMTPException as exception:
write('system.log', f'SMTP Error when trying to delete account {username} with error: {exception}') write('system.log', f'SMTP Error when trying to delete account {username} with error: {exception}')
return True, message return True, message
@ -219,11 +217,14 @@ class User(UserMixin, db.Model):
if password: self.set_password(password) if password: self.set_password(password)
old_email = self.get_email() old_email = self.get_email()
if email: if email:
for entry in User.query.all(): try:
if entry.get_email() == email and not entry == self: return False, f'The email address {email} is already in use.' for entry in User.query.all():
if entry.get_email() == email and not entry == self: return False, f'The email address {email} is already in use.'
except SQLAlchemyError as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
self.set_email(email) self.set_email(email)
try: try: db.session.commit()
db.session.commit()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
db.session.rollback() db.session.rollback()
write('system.log', f'Database error when updating user {self.get_username()}: {exception}') write('system.log', f'Database error when updating user {self.get_username()}: {exception}')
@ -257,8 +258,7 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p> <p>SKA Refereeing</p>
""" """
) )
try: try: mail.send(message)
mail.send(message)
except SMTPException as exception: except SMTPException as exception:
write('system.log', f'SMTP Error when trying to update account {self.get_username()} with error: {exception}') write('system.log', f'SMTP Error when trying to update account {self.get_username()} with error: {exception}')
return True, f'Account {self.get_username()} has been updated.' return True, f'Account {self.get_username()} has been updated.'

View File

@ -1,10 +1,12 @@
from ..forms.quiz import StartQuiz from ..forms.quiz import StartQuiz
from ..models import Entry, Test from ..models import Entry, Test
from ..tools.forms import send_errors_to_client from ..tools.forms import send_errors_to_client
from ..tools.logs import write
from ..tools.test import redirect_if_started from ..tools.test import redirect_if_started
from flask import abort, Blueprint, jsonify, redirect, render_template, request, session from flask import Blueprint, jsonify, render_template, request, session
from flask.helpers import flash, url_for from flask.helpers import abort, flash, redirect, url_for
from sqlalchemy.exc import SQLAlchemyError
from datetime import datetime from datetime import datetime
@ -37,7 +39,10 @@ def _start():
entry.set_club(request.form.get('club')) entry.set_club(request.form.get('club'))
entry.set_email(request.form.get('email')) entry.set_email(request.form.get('email'))
code = request.form.get('test_code').replace('', '').lower() code = request.form.get('test_code').replace('', '').lower()
test = Test.query.filter_by(code=code).first() try: test = Test.query.filter_by(code=code).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
entry.test = test entry.test = test
entry.user_code = request.form.get('user_code') entry.user_code = request.form.get('user_code')
entry.user_code = None if entry.user_code == '' else entry.user_code.lower() entry.user_code = None if entry.user_code == '' else entry.user_code.lower()
@ -59,16 +64,23 @@ def _start():
@quiz.route('/quiz/') @quiz.route('/quiz/')
def _quiz(): def _quiz():
id = session.get('id') id = session.get('id')
if not id or not Entry.query.filter_by(id=id).first(): try:
flash('Your session was not recognised. Please sign in to the quiz again.', 'error') if not id or not Entry.query.filter_by(id=id).first():
session.pop('id', None) flash('Your session was not recognised. Please sign in to the quiz again.', 'error')
return redirect(url_for('quiz._start')) session.pop('id', None)
return redirect(url_for('quiz._start'))
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/quiz/client.html') return render_template('/quiz/client.html')
@quiz.route('/result/') @quiz.route('/result/')
def _result(): def _result():
id = session.get('id') id = session.get('id')
entry = Entry.query.filter_by(id=id).first() try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not entry: return abort(404) if not entry: return abort(404)
session.pop('id',None) session.pop('id',None)
score = round(100*entry.result['score']/entry.result['max']) score = round(100*entry.result['score']/entry.result['max'])

View File

@ -1,18 +1,23 @@
from .data import load from .data import load
from ..models import User from ..models import User
from ..tools.logs import write
from flask import abort, redirect from flask.helpers import abort, flash, redirect, url_for
from flask.helpers import flash, url_for
from flask_login import current_user from flask_login import current_user
from sqlalchemy.exc import SQLAlchemyError
from functools import wraps from functools import wraps
def require_account_creation(function): def require_account_creation(function):
@wraps(function) @wraps(function)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
if User.query.count() == 0: try:
flash('Please register a user account.', 'alert') if User.query.count() == 0:
return redirect(url_for('admin._register')) flash('Please register a user account.', 'alert')
return redirect(url_for('admin._register'))
except SQLAlchemyError as exception:
write('system.log', f'Database error when checking for existing accounts: {exception}')
return abort(500)
return function(*args, **kwargs) return function(*args, **kwargs)
return wrapper return wrapper

View File

@ -1,8 +1,9 @@
from ..models import Dataset from ..models import Dataset
from ..tools.logs import write
from flask import current_app as app from flask import current_app as app
from flask import flash, redirect from flask.helpers import abort, flash, redirect, url_for
from flask.helpers import url_for from sqlalchemy.exc import SQLAlchemyError
import json import json
from pathlib import Path from pathlib import Path
@ -76,7 +77,10 @@ def get_tag_list(dataset:list):
def check_dataset_exists(function): def check_dataset_exists(function):
@wraps(function) @wraps(function)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
datasets = Dataset.query.all() try: datasets = Dataset.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when checking existing datasets: {exception}')
return abort(500)
if not datasets: if not datasets:
flash('There are no available question datasets. Please upload a question dataset first, or use the question editor to create a new dataset.', 'error') flash('There are no available question datasets. Please upload a question dataset first, or use the question editor to create a new dataset.', 'error')
return redirect(url_for('admin._questions')) return redirect(url_for('admin._questions'))

View File

@ -1,7 +1,9 @@
from ..extensions import db from ..extensions import db
from ..tools.logs import write
from flask import jsonify from flask import jsonify
from sqlalchemy.exc import SQLAlchemyError
from wtforms.validators import ValidationError from wtforms.validators import ValidationError
import json import json
@ -47,7 +49,10 @@ def get_time_options():
def get_dataset_choices(): def get_dataset_choices():
from ..models import Dataset from ..models import Dataset
datasets = Dataset.query.all() try: datasets = Dataset.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when fetching dataset lists: {exception}')
return []
dataset_choices = [] dataset_choices = []
for dataset in datasets: for dataset in datasets:
label = dataset.get_name() label = dataset.get_name()

View File

@ -1,8 +1,10 @@
from .data import randomise_list from .data import randomise_list
from ..models import Entry from ..models import Entry
from ..tools.logs import write
from flask import redirect, request, session from flask import request, session
from flask.helpers import url_for from flask.helpers import abort, redirect, url_for
from sqlalchemy.exc import SQLAlchemyError
from functools import wraps from functools import wraps
@ -129,8 +131,11 @@ def redirect_if_started(function):
@wraps(function) @wraps(function)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
id = session.get('id') id = session.get('id')
if request.method == 'GET' and id and Entry.query.filter_by(id=id).first(): try:
return redirect(url_for('quiz._quiz')) if request.method == 'GET' and id and Entry.query.filter_by(id=id).first(): return redirect(url_for('quiz._quiz'))
except SQLAlchemyError as exception:
write('system.log', f'Database error when checking if test has been started: {exception}')
return abort(500)
return function(*args, **kwargs) return function(*args, **kwargs)
return wrapper return wrapper

View File

@ -2,10 +2,12 @@ from ..forms.admin import EditDataset
from ..models import Dataset, User from ..models import Dataset, User
from ..tools.forms import get_dataset_choices, send_errors_to_client from ..tools.forms import get_dataset_choices, send_errors_to_client
from ..tools.data import check_dataset_exists from ..tools.data import check_dataset_exists
from ..tools.logs import write
from flask import Blueprint, flash, jsonify, redirect, render_template, request from flask import Blueprint, jsonify, render_template, request
from flask.helpers import url_for from flask.helpers import abort, flash, redirect, url_for
from flask_login import login_required from flask_login import login_required
from sqlalchemy.exc import SQLAlchemyError
view = Blueprint( view = Blueprint(
name='view', name='view',
@ -32,9 +34,13 @@ def _view():
@login_required @login_required
@check_dataset_exists @check_dataset_exists
def _view_console(id:str=None): def _view_console(id:str=None):
dataset = Dataset.query.filter_by(id=id).first() try:
datasets = Dataset.query.count() dataset = Dataset.query.filter_by(id=id).first()
users = User.query.all() datasets = Dataset.query.count()
users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset: if not dataset:
flash('Invalid dataset ID.', 'error') flash('Invalid dataset ID.', 'error')
return redirect(url_for('admin._questions')) return redirect(url_for('admin._questions'))

View File

@ -18,6 +18,7 @@ itsdangerous==2.1.2
Jinja2==3.1.2 Jinja2==3.1.2
MarkupSafe==2.1.1 MarkupSafe==2.1.1
pycparser==2.21 pycparser==2.21
PyMySQL==1.0.2
python-dotenv==0.20.0 python-dotenv==0.20.0
six==1.16.0 six==1.16.0
SQLAlchemy==1.4.40 SQLAlchemy==1.4.40

View File

@ -8,8 +8,7 @@ import sys
from getpass import getpass from getpass import getpass
with app.app_context(): with app.app_context():
try: try: users = User.query.all()
users = User.query.all()
except SQLAlchemyError as exception: except SQLAlchemyError as exception:
sys.exit('Database error:', exception) sys.exit('Database error:', exception)
print('') print('')