Compare commits

..

4 Commits

Author SHA1 Message Date
168b2b288a Added mysql-related database variables
Added options for different database engines
2022-08-20 12:01:08 +01:00
9a5f69f889 Added database-related env variables 2022-08-20 11:59:33 +01:00
7d6f256392 Added PyMySQL driver dependency 2022-08-20 11:59:02 +01:00
866c9b10cf Exception handling for database queries 2022-08-20 10:56:43 +01:00
18 changed files with 260 additions and 117 deletions

View File

@ -3,9 +3,18 @@ FLASK_DEBUG=False
TZ=Europe/London # Time Zone
## Flask Configuration
## App Configuration
SECRET_KEY= # Long, secure, secret string.
DATA=./data/
DATABASE_TYPE=SQLite # SQLite or MySQL, defaults to SQLite
DATABASE_HOST= # Required if MySQL. Must match name of Docker service, or provide host if database is on an external server. Defaults to localhost.
DATABASE_PORT= # Required if MySQL. Defaults to 3306
## MySQL Database Configuration (Required if configured to MySQL Database.)
# Note that if using the Docker service, these configuration values will also be used when creating the database in the mysql container.
MYSQL_DATABASE= # Required if MySQL.
MYSQL_USER= # Required if MySQL
MYSQL_PASSWORD= # Required if MySQL. Create secure password string. Note '@' character cannot be used.
## Flask Mail Configuration
MAIL_SERVER=postfix # Must match name of the Docker service

View File

@ -1,11 +1,13 @@
from .config import Production as Config
from .models import User
from .extensions import bootstrap, csrf, db, login_manager, mail
from .tools.logs import write
from flask import flash, Flask, render_template, request
from flask.helpers import url_for
from flask.helpers import abort, url_for
from flask.json import jsonify
from flask_wtf.csrf import CSRFError
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.middleware.proxy_fix import ProxyFix
from datetime import datetime
@ -24,7 +26,10 @@ def create_app():
login_manager.login_view = 'admin._login'
@login_manager.user_loader
def _load_user(id):
return User.query.filter_by(id=id).first()
try: return User.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when loading user fo login manager: {exception}')
return abort(500)
@app.before_request
def _check_cookie_consent():

View File

@ -1,13 +1,15 @@
from ..forms.admin import AddTimeAdjustment, CreateTest, CreateUser, DeleteUser, Login, Register, ResetPassword, UpdatePassword, UpdateUser, UploadData
from ..models import Dataset, Entry, Test, User
from ..tools.auth import disable_if_logged_in, require_account_creation
from ..tools.forms import get_dataset_choices, get_time_options, send_errors_to_client
from ..tools.data import check_dataset_exists, check_is_json, validate_json
from ..tools.forms import get_dataset_choices, get_time_options, send_errors_to_client
from ..tools.logs import write
from ..tools.test import answer_options, get_correct_answers
from flask import abort, Blueprint, jsonify, render_template, redirect, request, send_file, session
from flask.helpers import flash, url_for
from flask import abort, Blueprint, jsonify, render_template, request, send_file, session
from flask.helpers import abort, flash, redirect, url_for
from flask_login import current_user, login_required
from sqlalchemy.exc import SQLAlchemyError
from datetime import date, datetime, timedelta
from json import loads
@ -26,8 +28,12 @@ admin = Blueprint(
@admin.route('/dashboard/')
@login_required
def _home():
try:
tests = Test.query.all()
results = Entry.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
current_tests = [ test for test in tests if test.end_date >= datetime.now() and test.start_date.date() <= date.today() ]
current_tests.sort(key= lambda x: x.end_date, reverse=True)
upcoming_tests = [ test for test in tests if test.start_date.date() > datetime.now().date()]
@ -39,8 +45,12 @@ def _home():
@admin.route('/settings/')
@login_required
def _settings():
try:
users = User.query.all()
datasets = Dataset.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/admin/settings/index.html', users=users, datasets=datasets)
@admin.route('/login/', methods=['GET','POST'])
@ -50,7 +60,10 @@ def _login():
form = Login()
if request.method == 'POST':
if form.validate_on_submit():
users = User.query.all()
try: users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
user = None
for _user in users:
if _user.get_username() == request.form.get('username').lower():
@ -99,7 +112,10 @@ def _reset():
if request.method == 'POST':
if form.validate_on_submit():
user = None
users = User.query.all()
try: users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
for _user in users:
if _user.get_username() == request.form.get('username'):
user = _user
@ -111,7 +127,10 @@ def _reset():
token = request.args.get('token')
if token:
user = User.query.filter_by(reset_token=token).first()
try: user = User.query.filter_by(reset_token=token).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not user: return redirect(url_for('admin._reset'))
verification_token = user.verification_token
user.clear_reset_tokens()
@ -128,7 +147,10 @@ def _update_password():
form = UpdatePassword()
if form.validate_on_submit():
user = session.pop('user')
user = User.query.filter_by(id=user).first()
try: user = User.query.filter_by(id=user).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
user.update(password=request.form.get('password'))
session['remembered_username'] = user.get_username()
flash('Your password has been reset.', 'success')
@ -139,7 +161,10 @@ def _update_password():
@login_required
def _users():
form = CreateUser()
users = User.query.all()
try: users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if request.method == 'POST':
if form.validate_on_submit():
password = request.form.get('password')
@ -156,7 +181,10 @@ def _users():
@admin.route('/settings/users/delete/<string:id>', methods=['GET', 'POST'])
@login_required
def _delete_user(id:str):
user = User.query.filter_by(id=id).first()
try: user = User.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
form = DeleteUser()
if request.method == 'POST':
if not user: return jsonify({'error': 'User does not exist.'}), 400
@ -180,7 +208,10 @@ def _delete_user(id:str):
@admin.route('/settings/users/update/<string:id>', methods=['GET', 'POST'])
@login_required
def _update_user(id:str):
user = User.query.filter_by(id=id).first()
try: user = User.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
form = UpdateUser()
if request.method == 'POST':
if not user: return jsonify({'error': 'User does not exist.'}), 400
@ -222,7 +253,10 @@ def _questions():
return jsonify({'error': message}), 400
return send_errors_to_client(form=form)
data = Dataset.query.all()
try: data = Dataset.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/admin/settings/questions.html', form=form, data=data)
@admin.route('/settings/questions/delete/', methods=['POST'])
@ -231,7 +265,10 @@ def _edit_questions():
id = request.get_json()['id']
action = request.get_json()['action']
if not action == 'delete': return jsonify({'error': 'Invalid action.'}), 400
dataset = Dataset.query.filter_by(id=id).first()
try: dataset = Dataset.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if action == 'delete': success, message = dataset.delete()
if success: return jsonify({'success': message}), 200
return jsonify({'error': message}), 400
@ -239,7 +276,10 @@ def _edit_questions():
@admin.route('/settings/questions/download/<string:id>/')
@login_required
def _download(id:str):
dataset = Dataset.query.filter_by(id=id).first()
try: dataset = Dataset.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset: return abort(404)
data_path = path.abspath(dataset.get_file())
return send_file(data_path, as_attachment=True, attachment_filename=f'{dataset.get_name()}.json')
@ -250,7 +290,10 @@ def _download(id:str):
@check_dataset_exists
def _tests(filter:str=None):
tests = None
_tests = Test.query.all()
try: _tests = Test.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
form = None
now = datetime.now()
if filter not in ['create','active','scheduled','expired','all']: return redirect(url_for('admin._tests', filter='active'))
@ -296,7 +339,10 @@ def _create_test():
new_test.end_date = datetime.strptime(new_test.end_date, '%Y-%m-%dT%H:%M')
new_test.time_limit = None if request.form.get('time_limit') == 'none' else int(request.form.get('time_limit'))
dataset = request.form.get('dataset')
new_test.dataset = Dataset.query.filter_by(id=dataset).first()
try: new_test.dataset = Dataset.query.filter_by(id=dataset).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
success, message = new_test.create()
if success:
flash(message=message, category='success')
@ -310,7 +356,10 @@ def _edit_test():
id = request.get_json()['id']
action = request.get_json()['action']
if action not in ['start', 'delete', 'end']: return jsonify({'error': 'Invalid action.'}), 400
test = Test.query.filter_by(id=id).first()
try: test = Test.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not test: return jsonify({'error': 'Could not find the corresponding test to delete.'}), 404
if action == 'delete': success, message = test.delete()
if action == 'start': success, message = test.start()
@ -324,7 +373,10 @@ def _edit_test():
@login_required
def _view_test(id:str=None):
form = AddTimeAdjustment()
test = Test.query.filter_by(id=id).first()
try: test = Test.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if request.method == 'POST':
if not test: return jsonify({'error': 'Invalid test ID.'}), 404
if form.validate_on_submit():
@ -341,7 +393,10 @@ def _view_test(id:str=None):
@admin.route('/test/<string:id>/delete-adjustment/', methods=['POST'])
@login_required
def _delete_adjustment(id:str=None):
test = Test.query.filter_by(id=id).first()
try: test = Test.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not test: return jsonify({'error': 'Invalid test ID.'}), 404
user_code = request.get_json()['user_code'].lower()
success, message = test.remove_adjustment(user_code)
@ -351,13 +406,19 @@ def _delete_adjustment(id:str=None):
@admin.route('/results/')
@login_required
def _view_entries():
entries = Entry.query.all()
try: entries = Entry.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/admin/results.html', entries = entries)
@admin.route('/results/<string:id>/', methods = ['GET', 'POST'])
@login_required
def _view_entry(id:str=None):
entry = Entry.query.filter_by(id=id).first()
try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if request.method == 'POST':
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404
action = request.get_json()['action']
@ -388,6 +449,9 @@ def _view_entry(id:str=None):
def _generate_certificate():
from ..extensions import db
id = request.get_json()['id']
entry = Entry.query.filter_by(id=id).first()
try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404
return render_template('/admin/components/certificate.html', entry = entry)

View File

@ -1,9 +1,12 @@
from ..models import Dataset, Entry, User
from ..tools.data import validate_json
from ..tools.logs import write
from ..tools.test import evaluate_answers, generate_questions
from flask import Blueprint, flash, jsonify, request, url_for
from flask import Blueprint, jsonify, request
from flask.helpers import abort, flash, url_for
from flask_login import login_required
from sqlalchemy.exc import SQLAlchemyError
from datetime import datetime, timedelta
from json import loads
@ -16,7 +19,10 @@ api = Blueprint(
@api.route('/questions/', methods=['POST'])
def _fetch_questions():
id = request.get_json()['id']
entry = Entry.query.filter_by(id=id).first()
try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 400
test = entry.test
user_code = entry.user_code
@ -50,7 +56,10 @@ def _fetch_questions():
def _submit_quiz():
id = request.get_json()['id']
answers = request.get_json()['answers']
entry = Entry.query.filter_by(id=id).first()
try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not entry: return jsonify({'error': 'Unrecognised Entry.'}), 400
test = entry.test
dataset = test.dataset
@ -71,7 +80,12 @@ def _submit_quiz():
def _editor(id:str=None):
request_data = request.get_json()
id = request_data['id']
try:
dataset = Dataset.query.filter_by(id=id).first()
user = User.query.filter_by(id=creator).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset: return jsonify({'error': 'Invalid request. Dataset not found.'}), 404
data_path = dataset.get_file()
if request_data['action'] == 'fetch':
@ -83,7 +97,6 @@ def _editor(id:str=None):
name = request_data['name']
data = request_data['data']
if not validate_json(data): return jsonify({'error': 'The data you submitted was invalid.'}), 400
user = User.query.filter_by(id=creator).first()
dataset.set_name(name)
dataset.creator = user
success, message = dataset.update(data=data, default=default)

View File

@ -4,6 +4,7 @@ from dotenv import load_dotenv
load_dotenv('../.env')
class Config(object):
"""Basic App Configuration"""
APP_HOST = '0.0.0.0'
DATA = './data/'
DEBUG = False
@ -11,9 +12,8 @@ class Config(object):
SECRET_KEY = os.getenv('SECRET_KEY')
SERVER_NAME = os.getenv('SERVER_NAME')
SESSION_COOKIE_SECURE = True
SQLALCHEMY_DATABASE_URI = f'sqlite:///{Path(os.path.abspath(f"{DATA}/database.db"))}'
SQLALCHEMY_TRACK_MODIFICATIONS = False
"""Email Engine Configuration"""
MAIL_SERVER = os.getenv('MAIL_SERVER')
MAIL_PORT = int(os.getenv('MAIL_PORT') or 25)
MAIL_USE_TLS = False
@ -26,6 +26,19 @@ class Config(object):
MAIL_SUPPRESS_SEND = False
MAIL_ASCII_ATTACHMENTS = bool(os.getenv('MAIL_ASCII_ATTACHMENTS') or True)
"""Database Driver Configuration"""
DATABASE_TYPE = os.getenv('DATABASE_TYPE') or 'SQLite'
SQLALCHEMY_TRACK_MODIFICATIONS = False
if DATABASE_TYPE.lower() == 'mysql' and os.getenv('MYSQL_DATABASE') and os.getenv('MYSQL_USER') and os.getenv('MYSQL_PASSWORD'):
DATABASE_HOST = os.getenv('DATABASE_HOST') or 'localhost'
DATABASE_PORT = int(os.getenv('DATABASE_PORT') or 3306)
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE')
MYSQL_USER = os.getenv('MYSQL_USER')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD')
SQLALCHEMY_DATABASE_URI = f'mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{DATABASE_HOST}:{DATABASE_PORT}/{MYSQL_DATABASE}'
else: SQLALCHEMY_DATABASE_URI = f'sqlite:///{Path(os.path.abspath(f"{DATA}/database.db"))}'
class Production(Config):
pass

View File

@ -1,11 +1,13 @@
from ..forms.admin import EditDataset
from ..models import Dataset, User
from ..tools.forms import get_dataset_choices, send_errors_to_client
from ..tools.data import check_dataset_exists
from ..tools.forms import get_dataset_choices, send_errors_to_client
from ..tools.logs import write
from flask import Blueprint, flash, jsonify, redirect, render_template, request
from flask.helpers import url_for
from flask import Blueprint, jsonify, render_template
from flask.helpers import abort, flash, redirect, request, url_for
from flask_login import login_required
from sqlalchemy.exc import SQLAlchemyError
editor = Blueprint(
name='editor',
@ -31,9 +33,13 @@ def _editor():
@check_dataset_exists
@login_required
def _editor_console(id:str=None):
try:
dataset = Dataset.query.filter_by(id=id).first()
datasets = Dataset.query.count()
users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset:
flash('Invalid dataset ID.', 'error')
return redirect(url_for('admin._questions'))

View File

@ -2,8 +2,8 @@ from ..extensions import db
from ..tools.encryption import decrypt, encrypt
from ..tools.logs import write
from flask import flash
from flask import current_app as app
from flask.helpers import flash
from flask_login import current_user
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.utils import secure_filename
@ -43,11 +43,13 @@ class Dataset(db.Model):
def get_name(self): return decrypt(self.name)
def make_default(self):
for dataset in Dataset.query.all():
dataset.default = False
self.default = True
try:
db.session.commit()
for dataset in Dataset.query.all(): dataset.default = False
except SQLAlchemyError as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
self.default = True
try: db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
@ -61,10 +63,14 @@ class Dataset(db.Model):
message = 'Cannot delete the default dataset.'
flash(message, 'error')
return False, message
try:
if Dataset.query.count() == 1:
message = 'Cannot delete the only dataset.'
flash(message, 'error')
return False, message
except SQLAlchemyError as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
write('system.log', f'Dataset {self.id} deleted by {current_user.get_username()}.')
filename = secure_filename('.'.join([self.id,'json']))
data = Path(app.config.get('DATA'))

View File

@ -84,8 +84,7 @@ class Entry(db.Model):
def start(self):
self.start_time = datetime.now()
self.status = 'started'
try:
db.session.commit()
try: db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when starting test for {self.get_surname()}, {self.get_first_name()}: {exception}')
@ -104,8 +103,7 @@ class Entry(db.Model):
else:
self.status = 'late'
self.valid = False
try:
db.session.commit()
try: db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when submitting entry for {self.get_surname()}, {self.get_first_name()}: {exception}')
@ -118,8 +116,7 @@ class Entry(db.Model):
if self.status == 'started': return False, 'The entry is still pending.'
self.valid = True
self.status = 'completed'
try:
db.session.commit()
try: db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when validating entry {self.id}: {exception}')
@ -201,7 +198,6 @@ class Entry(db.Model):
<p>Best wishes, <br/> SKA Refereeing</p>
"""
)
try:
mail.send(email)
try: mail.send(email)
except SMTPException as exception:
write('system.log', f'SMTP Error when trying to notify results to {self.get_surname()}, {self.get_first_name()} with error: {exception}')

View File

@ -66,8 +66,7 @@ class Test(db.Model):
def delete(self):
if self.entries: return False, f'Cannot delete a test with submitted entries.'
db.session.delete(self)
try:
db.session.commit()
try: db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when deleting test {self.get_code()}: {exception}')
@ -79,8 +78,7 @@ class Test(db.Model):
now = datetime.now()
if self.start_date.date() > now.date():
self.start_date = now
try:
db.session.commit()
try: db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when launching test {self.get_code()}: {exception}')
@ -93,8 +91,7 @@ class Test(db.Model):
now = datetime.now()
if self.end_date >= now:
self.end_date = now
try:
db.session.commit()
try: db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when closing test {self.get_code()}: {exception}')
@ -108,8 +105,7 @@ class Test(db.Model):
code = secrets.token_hex(3).lower()
adjustments[code] = time
self.adjustments = adjustments
try:
db.session.commit()
try: db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when adding adjustment to test {self.get_code()}: {exception}')
@ -121,8 +117,7 @@ class Test(db.Model):
if not self.adjustments: return False, f'There are no adjustments configured for test {self.get_code()}.'
self.adjustments.pop(code)
if not self.adjustments: self.adjustments = None
try:
db.session.commit()
try: db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when deleting adjustment from test {self.get_code()}: {exception}')
@ -135,8 +130,7 @@ class Test(db.Model):
if start_date: self.start_date = start_date
if end_date: self.end_date = end_date
if time_limit is not None: self.time_limit = time_limit
try:
db.session.commit()
try: db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when updating test {self.get_code()}: {exception}')

View File

@ -2,8 +2,8 @@ from ..extensions import db, mail
from ..tools.encryption import decrypt, encrypt
from ..tools.logs import write
from flask import flash, jsonify, session
from flask.helpers import url_for
from flask import jsonify, session
from flask.helpers import flash, url_for
from flask_login import current_user, login_user, logout_user, UserMixin
from flask_mail import Message
from smtplib import SMTPException
@ -57,7 +57,10 @@ class User(UserMixin, db.Model):
def register(self, notify:bool=False, password:str=None):
self.generate_id()
users = User.query.all()
try: users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
for user in users:
if user.get_username() == self.get_username(): return False, f'Username {self.get_username()} already in use.'
if user.get_email() == self.get_email(): return False, f'Email address {self.get_email()} already in use.'
@ -97,8 +100,7 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p>
"""
)
try:
mail.send(email)
try: mail.send(email)
except SMTPException as exception:
write('system.log', f'SMTP Error while trying to notify new user account creation to {self.get_username()} with error: {exception}')
return True, f'User {self.get_username()} was created successfully.'
@ -151,14 +153,12 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p>
"""
)
try:
mail.send(email)
try: mail.send(email)
except SMTPException as exception:
write('system.log', f'SMTP Error while trying to reset password for {self.get_username()} with error: {exception}')
db.session.rollback()
return jsonify({'error': f'SMTP Error: {exception}'}), 500
try:
db.session.commit()
try: db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when resetting password for user {self.get_username()}: {exception}')
@ -167,8 +167,7 @@ class User(UserMixin, db.Model):
def clear_reset_tokens(self):
self.reset_token = self.verification_token = None
try:
db.session.commit()
try: db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when resetting clearing reset tokens for user {self.get_username()}: {exception}')
@ -208,8 +207,7 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p>
"""
)
try:
mail.send(email)
try: mail.send(email)
except SMTPException as exception:
write('system.log', f'SMTP Error when trying to delete account {username} with error: {exception}')
return True, message
@ -219,11 +217,14 @@ class User(UserMixin, db.Model):
if password: self.set_password(password)
old_email = self.get_email()
if email:
try:
for entry in User.query.all():
if entry.get_email() == email and not entry == self: return False, f'The email address {email} is already in use.'
except SQLAlchemyError as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
self.set_email(email)
try:
db.session.commit()
try: db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when updating user {self.get_username()}: {exception}')
@ -257,8 +258,7 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p>
"""
)
try:
mail.send(message)
try: mail.send(message)
except SMTPException as exception:
write('system.log', f'SMTP Error when trying to update account {self.get_username()} with error: {exception}')
return True, f'Account {self.get_username()} has been updated.'

View File

@ -1,10 +1,12 @@
from ..forms.quiz import StartQuiz
from ..models import Entry, Test
from ..tools.forms import send_errors_to_client
from ..tools.logs import write
from ..tools.test import redirect_if_started
from flask import abort, Blueprint, jsonify, redirect, render_template, request, session
from flask.helpers import flash, url_for
from flask import Blueprint, jsonify, render_template, request, session
from flask.helpers import abort, flash, redirect, url_for
from sqlalchemy.exc import SQLAlchemyError
from datetime import datetime
@ -37,7 +39,10 @@ def _start():
entry.set_club(request.form.get('club'))
entry.set_email(request.form.get('email'))
code = request.form.get('test_code').replace('', '').lower()
test = Test.query.filter_by(code=code).first()
try: test = Test.query.filter_by(code=code).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
entry.test = test
entry.user_code = request.form.get('user_code')
entry.user_code = None if entry.user_code == '' else entry.user_code.lower()
@ -59,16 +64,23 @@ def _start():
@quiz.route('/quiz/')
def _quiz():
id = session.get('id')
try:
if not id or not Entry.query.filter_by(id=id).first():
flash('Your session was not recognised. Please sign in to the quiz again.', 'error')
session.pop('id', None)
return redirect(url_for('quiz._start'))
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/quiz/client.html')
@quiz.route('/result/')
def _result():
id = session.get('id')
entry = Entry.query.filter_by(id=id).first()
try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not entry: return abort(404)
session.pop('id',None)
score = round(100*entry.result['score']/entry.result['max'])

View File

@ -1,18 +1,23 @@
from .data import load
from ..models import User
from ..tools.logs import write
from flask import abort, redirect
from flask.helpers import flash, url_for
from flask.helpers import abort, flash, redirect, url_for
from flask_login import current_user
from sqlalchemy.exc import SQLAlchemyError
from functools import wraps
def require_account_creation(function):
@wraps(function)
def wrapper(*args, **kwargs):
try:
if User.query.count() == 0:
flash('Please register a user account.', 'alert')
return redirect(url_for('admin._register'))
except SQLAlchemyError as exception:
write('system.log', f'Database error when checking for existing accounts: {exception}')
return abort(500)
return function(*args, **kwargs)
return wrapper

View File

@ -1,8 +1,9 @@
from ..models import Dataset
from ..tools.logs import write
from flask import current_app as app
from flask import flash, redirect
from flask.helpers import url_for
from flask.helpers import abort, flash, redirect, url_for
from sqlalchemy.exc import SQLAlchemyError
import json
from pathlib import Path
@ -76,7 +77,10 @@ def get_tag_list(dataset:list):
def check_dataset_exists(function):
@wraps(function)
def wrapper(*args, **kwargs):
datasets = Dataset.query.all()
try: datasets = Dataset.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when checking existing datasets: {exception}')
return abort(500)
if not datasets:
flash('There are no available question datasets. Please upload a question dataset first, or use the question editor to create a new dataset.', 'error')
return redirect(url_for('admin._questions'))

View File

@ -1,7 +1,9 @@
from ..extensions import db
from ..tools.logs import write
from flask import jsonify
from sqlalchemy.exc import SQLAlchemyError
from wtforms.validators import ValidationError
import json
@ -47,7 +49,10 @@ def get_time_options():
def get_dataset_choices():
from ..models import Dataset
datasets = Dataset.query.all()
try: datasets = Dataset.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when fetching dataset lists: {exception}')
return []
dataset_choices = []
for dataset in datasets:
label = dataset.get_name()

View File

@ -1,8 +1,10 @@
from .data import randomise_list
from ..models import Entry
from ..tools.logs import write
from flask import redirect, request, session
from flask.helpers import url_for
from flask import request, session
from flask.helpers import abort, redirect, url_for
from sqlalchemy.exc import SQLAlchemyError
from functools import wraps
@ -129,8 +131,11 @@ def redirect_if_started(function):
@wraps(function)
def wrapper(*args, **kwargs):
id = session.get('id')
if request.method == 'GET' and id and Entry.query.filter_by(id=id).first():
return redirect(url_for('quiz._quiz'))
try:
if request.method == 'GET' and id and Entry.query.filter_by(id=id).first(): return redirect(url_for('quiz._quiz'))
except SQLAlchemyError as exception:
write('system.log', f'Database error when checking if test has been started: {exception}')
return abort(500)
return function(*args, **kwargs)
return wrapper

View File

@ -2,10 +2,12 @@ from ..forms.admin import EditDataset
from ..models import Dataset, User
from ..tools.forms import get_dataset_choices, send_errors_to_client
from ..tools.data import check_dataset_exists
from ..tools.logs import write
from flask import Blueprint, flash, jsonify, redirect, render_template, request
from flask.helpers import url_for
from flask import Blueprint, jsonify, render_template, request
from flask.helpers import abort, flash, redirect, url_for
from flask_login import login_required
from sqlalchemy.exc import SQLAlchemyError
view = Blueprint(
name='view',
@ -32,9 +34,13 @@ def _view():
@login_required
@check_dataset_exists
def _view_console(id:str=None):
try:
dataset = Dataset.query.filter_by(id=id).first()
datasets = Dataset.query.count()
users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset:
flash('Invalid dataset ID.', 'error')
return redirect(url_for('admin._questions'))

View File

@ -18,6 +18,7 @@ itsdangerous==2.1.2
Jinja2==3.1.2
MarkupSafe==2.1.1
pycparser==2.21
PyMySQL==1.0.2
python-dotenv==0.20.0
six==1.16.0
SQLAlchemy==1.4.40

View File

@ -8,8 +8,7 @@ import sys
from getpass import getpass
with app.app_context():
try:
users = User.query.all()
try: users = User.query.all()
except SQLAlchemyError as exception:
sys.exit('Database error:', exception)
print('')