Compare commits

..

No commits in common. "168b2b288a40f44fad35089b3533971615a5da77" and "b8fd65d8561516a3a534233458ce2ccefd519d2c" have entirely different histories.

18 changed files with 117 additions and 260 deletions

View File

@ -3,18 +3,9 @@ FLASK_DEBUG=False
TZ=Europe/London # Time Zone
## App Configuration
## Flask Configuration
SECRET_KEY= # Long, secure, secret string.
DATA=./data/
DATABASE_TYPE=SQLite # SQLite or MySQL, defaults to SQLite
DATABASE_HOST= # Required if MySQL. Must match name of Docker service, or provide host if database is on an external server. Defaults to localhost.
DATABASE_PORT= # Required if MySQL. Defaults to 3306
## MySQL Database Configuration (Required if configured to MySQL Database.)
# Note that if using the Docker service, these configuration values will also be used when creating the database in the mysql container.
MYSQL_DATABASE= # Required if MySQL.
MYSQL_USER= # Required if MySQL
MYSQL_PASSWORD= # Required if MySQL. Create secure password string. Note '@' character cannot be used.
## Flask Mail Configuration
MAIL_SERVER=postfix # Must match name of the Docker service

View File

@ -1,13 +1,11 @@
from .config import Production as Config
from .models import User
from .extensions import bootstrap, csrf, db, login_manager, mail
from .tools.logs import write
from flask import flash, Flask, render_template, request
from flask.helpers import abort, url_for
from flask.helpers import url_for
from flask.json import jsonify
from flask_wtf.csrf import CSRFError
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.middleware.proxy_fix import ProxyFix
from datetime import datetime
@ -26,10 +24,7 @@ def create_app():
login_manager.login_view = 'admin._login'
@login_manager.user_loader
def _load_user(id):
try: return User.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when loading user fo login manager: {exception}')
return abort(500)
return User.query.filter_by(id=id).first()
@app.before_request
def _check_cookie_consent():

View File

@ -1,15 +1,13 @@
from ..forms.admin import AddTimeAdjustment, CreateTest, CreateUser, DeleteUser, Login, Register, ResetPassword, UpdatePassword, UpdateUser, UploadData
from ..models import Dataset, Entry, Test, User
from ..tools.auth import disable_if_logged_in, require_account_creation
from ..tools.data import check_dataset_exists, check_is_json, validate_json
from ..tools.forms import get_dataset_choices, get_time_options, send_errors_to_client
from ..tools.logs import write
from ..tools.data import check_dataset_exists, check_is_json, validate_json
from ..tools.test import answer_options, get_correct_answers
from flask import abort, Blueprint, jsonify, render_template, request, send_file, session
from flask.helpers import abort, flash, redirect, url_for
from flask import abort, Blueprint, jsonify, render_template, redirect, request, send_file, session
from flask.helpers import flash, url_for
from flask_login import current_user, login_required
from sqlalchemy.exc import SQLAlchemyError
from datetime import date, datetime, timedelta
from json import loads
@ -28,12 +26,8 @@ admin = Blueprint(
@admin.route('/dashboard/')
@login_required
def _home():
try:
tests = Test.query.all()
results = Entry.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
current_tests = [ test for test in tests if test.end_date >= datetime.now() and test.start_date.date() <= date.today() ]
current_tests.sort(key= lambda x: x.end_date, reverse=True)
upcoming_tests = [ test for test in tests if test.start_date.date() > datetime.now().date()]
@ -45,12 +39,8 @@ def _home():
@admin.route('/settings/')
@login_required
def _settings():
try:
users = User.query.all()
datasets = Dataset.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/admin/settings/index.html', users=users, datasets=datasets)
@admin.route('/login/', methods=['GET','POST'])
@ -60,10 +50,7 @@ def _login():
form = Login()
if request.method == 'POST':
if form.validate_on_submit():
try: users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
users = User.query.all()
user = None
for _user in users:
if _user.get_username() == request.form.get('username').lower():
@ -112,10 +99,7 @@ def _reset():
if request.method == 'POST':
if form.validate_on_submit():
user = None
try: users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
users = User.query.all()
for _user in users:
if _user.get_username() == request.form.get('username'):
user = _user
@ -127,10 +111,7 @@ def _reset():
token = request.args.get('token')
if token:
try: user = User.query.filter_by(reset_token=token).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
user = User.query.filter_by(reset_token=token).first()
if not user: return redirect(url_for('admin._reset'))
verification_token = user.verification_token
user.clear_reset_tokens()
@ -147,10 +128,7 @@ def _update_password():
form = UpdatePassword()
if form.validate_on_submit():
user = session.pop('user')
try: user = User.query.filter_by(id=user).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
user = User.query.filter_by(id=user).first()
user.update(password=request.form.get('password'))
session['remembered_username'] = user.get_username()
flash('Your password has been reset.', 'success')
@ -161,10 +139,7 @@ def _update_password():
@login_required
def _users():
form = CreateUser()
try: users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
users = User.query.all()
if request.method == 'POST':
if form.validate_on_submit():
password = request.form.get('password')
@ -181,10 +156,7 @@ def _users():
@admin.route('/settings/users/delete/<string:id>', methods=['GET', 'POST'])
@login_required
def _delete_user(id:str):
try: user = User.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
user = User.query.filter_by(id=id).first()
form = DeleteUser()
if request.method == 'POST':
if not user: return jsonify({'error': 'User does not exist.'}), 400
@ -208,10 +180,7 @@ def _delete_user(id:str):
@admin.route('/settings/users/update/<string:id>', methods=['GET', 'POST'])
@login_required
def _update_user(id:str):
try: user = User.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
user = User.query.filter_by(id=id).first()
form = UpdateUser()
if request.method == 'POST':
if not user: return jsonify({'error': 'User does not exist.'}), 400
@ -253,10 +222,7 @@ def _questions():
return jsonify({'error': message}), 400
return send_errors_to_client(form=form)
try: data = Dataset.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
data = Dataset.query.all()
return render_template('/admin/settings/questions.html', form=form, data=data)
@admin.route('/settings/questions/delete/', methods=['POST'])
@ -265,10 +231,7 @@ def _edit_questions():
id = request.get_json()['id']
action = request.get_json()['action']
if not action == 'delete': return jsonify({'error': 'Invalid action.'}), 400
try: dataset = Dataset.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
dataset = Dataset.query.filter_by(id=id).first()
if action == 'delete': success, message = dataset.delete()
if success: return jsonify({'success': message}), 200
return jsonify({'error': message}), 400
@ -276,10 +239,7 @@ def _edit_questions():
@admin.route('/settings/questions/download/<string:id>/')
@login_required
def _download(id:str):
try: dataset = Dataset.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
dataset = Dataset.query.filter_by(id=id).first()
if not dataset: return abort(404)
data_path = path.abspath(dataset.get_file())
return send_file(data_path, as_attachment=True, attachment_filename=f'{dataset.get_name()}.json')
@ -290,10 +250,7 @@ def _download(id:str):
@check_dataset_exists
def _tests(filter:str=None):
tests = None
try: _tests = Test.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
_tests = Test.query.all()
form = None
now = datetime.now()
if filter not in ['create','active','scheduled','expired','all']: return redirect(url_for('admin._tests', filter='active'))
@ -339,10 +296,7 @@ def _create_test():
new_test.end_date = datetime.strptime(new_test.end_date, '%Y-%m-%dT%H:%M')
new_test.time_limit = None if request.form.get('time_limit') == 'none' else int(request.form.get('time_limit'))
dataset = request.form.get('dataset')
try: new_test.dataset = Dataset.query.filter_by(id=dataset).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
new_test.dataset = Dataset.query.filter_by(id=dataset).first()
success, message = new_test.create()
if success:
flash(message=message, category='success')
@ -356,10 +310,7 @@ def _edit_test():
id = request.get_json()['id']
action = request.get_json()['action']
if action not in ['start', 'delete', 'end']: return jsonify({'error': 'Invalid action.'}), 400
try: test = Test.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
test = Test.query.filter_by(id=id).first()
if not test: return jsonify({'error': 'Could not find the corresponding test to delete.'}), 404
if action == 'delete': success, message = test.delete()
if action == 'start': success, message = test.start()
@ -373,10 +324,7 @@ def _edit_test():
@login_required
def _view_test(id:str=None):
form = AddTimeAdjustment()
try: test = Test.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
test = Test.query.filter_by(id=id).first()
if request.method == 'POST':
if not test: return jsonify({'error': 'Invalid test ID.'}), 404
if form.validate_on_submit():
@ -393,10 +341,7 @@ def _view_test(id:str=None):
@admin.route('/test/<string:id>/delete-adjustment/', methods=['POST'])
@login_required
def _delete_adjustment(id:str=None):
try: test = Test.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
test = Test.query.filter_by(id=id).first()
if not test: return jsonify({'error': 'Invalid test ID.'}), 404
user_code = request.get_json()['user_code'].lower()
success, message = test.remove_adjustment(user_code)
@ -406,19 +351,13 @@ def _delete_adjustment(id:str=None):
@admin.route('/results/')
@login_required
def _view_entries():
try: entries = Entry.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
entries = Entry.query.all()
return render_template('/admin/results.html', entries = entries)
@admin.route('/results/<string:id>/', methods = ['GET', 'POST'])
@login_required
def _view_entry(id:str=None):
try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
entry = Entry.query.filter_by(id=id).first()
if request.method == 'POST':
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404
action = request.get_json()['action']
@ -449,9 +388,6 @@ def _view_entry(id:str=None):
def _generate_certificate():
from ..extensions import db
id = request.get_json()['id']
try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
entry = Entry.query.filter_by(id=id).first()
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404
return render_template('/admin/components/certificate.html', entry = entry)

View File

@ -1,12 +1,9 @@
from ..models import Dataset, Entry, User
from ..tools.data import validate_json
from ..tools.logs import write
from ..tools.test import evaluate_answers, generate_questions
from flask import Blueprint, jsonify, request
from flask.helpers import abort, flash, url_for
from flask import Blueprint, flash, jsonify, request, url_for
from flask_login import login_required
from sqlalchemy.exc import SQLAlchemyError
from datetime import datetime, timedelta
from json import loads
@ -19,10 +16,7 @@ api = Blueprint(
@api.route('/questions/', methods=['POST'])
def _fetch_questions():
id = request.get_json()['id']
try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
entry = Entry.query.filter_by(id=id).first()
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 400
test = entry.test
user_code = entry.user_code
@ -56,10 +50,7 @@ def _fetch_questions():
def _submit_quiz():
id = request.get_json()['id']
answers = request.get_json()['answers']
try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
entry = Entry.query.filter_by(id=id).first()
if not entry: return jsonify({'error': 'Unrecognised Entry.'}), 400
test = entry.test
dataset = test.dataset
@ -80,12 +71,7 @@ def _submit_quiz():
def _editor(id:str=None):
request_data = request.get_json()
id = request_data['id']
try:
dataset = Dataset.query.filter_by(id=id).first()
user = User.query.filter_by(id=creator).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset: return jsonify({'error': 'Invalid request. Dataset not found.'}), 404
data_path = dataset.get_file()
if request_data['action'] == 'fetch':
@ -97,6 +83,7 @@ def _editor(id:str=None):
name = request_data['name']
data = request_data['data']
if not validate_json(data): return jsonify({'error': 'The data you submitted was invalid.'}), 400
user = User.query.filter_by(id=creator).first()
dataset.set_name(name)
dataset.creator = user
success, message = dataset.update(data=data, default=default)

View File

@ -4,7 +4,6 @@ from dotenv import load_dotenv
load_dotenv('../.env')
class Config(object):
"""Basic App Configuration"""
APP_HOST = '0.0.0.0'
DATA = './data/'
DEBUG = False
@ -12,8 +11,9 @@ class Config(object):
SECRET_KEY = os.getenv('SECRET_KEY')
SERVER_NAME = os.getenv('SERVER_NAME')
SESSION_COOKIE_SECURE = True
SQLALCHEMY_DATABASE_URI = f'sqlite:///{Path(os.path.abspath(f"{DATA}/database.db"))}'
SQLALCHEMY_TRACK_MODIFICATIONS = False
"""Email Engine Configuration"""
MAIL_SERVER = os.getenv('MAIL_SERVER')
MAIL_PORT = int(os.getenv('MAIL_PORT') or 25)
MAIL_USE_TLS = False
@ -26,19 +26,6 @@ class Config(object):
MAIL_SUPPRESS_SEND = False
MAIL_ASCII_ATTACHMENTS = bool(os.getenv('MAIL_ASCII_ATTACHMENTS') or True)
"""Database Driver Configuration"""
DATABASE_TYPE = os.getenv('DATABASE_TYPE') or 'SQLite'
SQLALCHEMY_TRACK_MODIFICATIONS = False
if DATABASE_TYPE.lower() == 'mysql' and os.getenv('MYSQL_DATABASE') and os.getenv('MYSQL_USER') and os.getenv('MYSQL_PASSWORD'):
DATABASE_HOST = os.getenv('DATABASE_HOST') or 'localhost'
DATABASE_PORT = int(os.getenv('DATABASE_PORT') or 3306)
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE')
MYSQL_USER = os.getenv('MYSQL_USER')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD')
SQLALCHEMY_DATABASE_URI = f'mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{DATABASE_HOST}:{DATABASE_PORT}/{MYSQL_DATABASE}'
else: SQLALCHEMY_DATABASE_URI = f'sqlite:///{Path(os.path.abspath(f"{DATA}/database.db"))}'
class Production(Config):
pass

View File

@ -1,13 +1,11 @@
from ..forms.admin import EditDataset
from ..models import Dataset, User
from ..tools.data import check_dataset_exists
from ..tools.forms import get_dataset_choices, send_errors_to_client
from ..tools.logs import write
from ..tools.data import check_dataset_exists
from flask import Blueprint, jsonify, render_template
from flask.helpers import abort, flash, redirect, request, url_for
from flask import Blueprint, flash, jsonify, redirect, render_template, request
from flask.helpers import url_for
from flask_login import login_required
from sqlalchemy.exc import SQLAlchemyError
editor = Blueprint(
name='editor',
@ -33,13 +31,9 @@ def _editor():
@check_dataset_exists
@login_required
def _editor_console(id:str=None):
try:
dataset = Dataset.query.filter_by(id=id).first()
datasets = Dataset.query.count()
users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset:
flash('Invalid dataset ID.', 'error')
return redirect(url_for('admin._questions'))

View File

@ -2,8 +2,8 @@ from ..extensions import db
from ..tools.encryption import decrypt, encrypt
from ..tools.logs import write
from flask import flash
from flask import current_app as app
from flask.helpers import flash
from flask_login import current_user
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.utils import secure_filename
@ -43,13 +43,11 @@ class Dataset(db.Model):
def get_name(self): return decrypt(self.name)
def make_default(self):
try:
for dataset in Dataset.query.all(): dataset.default = False
except SQLAlchemyError as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
for dataset in Dataset.query.all():
dataset.default = False
self.default = True
try: db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
@ -63,14 +61,10 @@ class Dataset(db.Model):
message = 'Cannot delete the default dataset.'
flash(message, 'error')
return False, message
try:
if Dataset.query.count() == 1:
message = 'Cannot delete the only dataset.'
flash(message, 'error')
return False, message
except SQLAlchemyError as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
write('system.log', f'Dataset {self.id} deleted by {current_user.get_username()}.')
filename = secure_filename('.'.join([self.id,'json']))
data = Path(app.config.get('DATA'))

View File

@ -84,7 +84,8 @@ class Entry(db.Model):
def start(self):
self.start_time = datetime.now()
self.status = 'started'
try: db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when starting test for {self.get_surname()}, {self.get_first_name()}: {exception}')
@ -103,7 +104,8 @@ class Entry(db.Model):
else:
self.status = 'late'
self.valid = False
try: db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when submitting entry for {self.get_surname()}, {self.get_first_name()}: {exception}')
@ -116,7 +118,8 @@ class Entry(db.Model):
if self.status == 'started': return False, 'The entry is still pending.'
self.valid = True
self.status = 'completed'
try: db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when validating entry {self.id}: {exception}')
@ -198,6 +201,7 @@ class Entry(db.Model):
<p>Best wishes, <br/> SKA Refereeing</p>
"""
)
try: mail.send(email)
try:
mail.send(email)
except SMTPException as exception:
write('system.log', f'SMTP Error when trying to notify results to {self.get_surname()}, {self.get_first_name()} with error: {exception}')

View File

@ -66,7 +66,8 @@ class Test(db.Model):
def delete(self):
if self.entries: return False, f'Cannot delete a test with submitted entries.'
db.session.delete(self)
try: db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when deleting test {self.get_code()}: {exception}')
@ -78,7 +79,8 @@ class Test(db.Model):
now = datetime.now()
if self.start_date.date() > now.date():
self.start_date = now
try: db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when launching test {self.get_code()}: {exception}')
@ -91,7 +93,8 @@ class Test(db.Model):
now = datetime.now()
if self.end_date >= now:
self.end_date = now
try: db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when closing test {self.get_code()}: {exception}')
@ -105,7 +108,8 @@ class Test(db.Model):
code = secrets.token_hex(3).lower()
adjustments[code] = time
self.adjustments = adjustments
try: db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when adding adjustment to test {self.get_code()}: {exception}')
@ -117,7 +121,8 @@ class Test(db.Model):
if not self.adjustments: return False, f'There are no adjustments configured for test {self.get_code()}.'
self.adjustments.pop(code)
if not self.adjustments: self.adjustments = None
try: db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when deleting adjustment from test {self.get_code()}: {exception}')
@ -130,7 +135,8 @@ class Test(db.Model):
if start_date: self.start_date = start_date
if end_date: self.end_date = end_date
if time_limit is not None: self.time_limit = time_limit
try: db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when updating test {self.get_code()}: {exception}')

View File

@ -2,8 +2,8 @@ from ..extensions import db, mail
from ..tools.encryption import decrypt, encrypt
from ..tools.logs import write
from flask import jsonify, session
from flask.helpers import flash, url_for
from flask import flash, jsonify, session
from flask.helpers import url_for
from flask_login import current_user, login_user, logout_user, UserMixin
from flask_mail import Message
from smtplib import SMTPException
@ -57,10 +57,7 @@ class User(UserMixin, db.Model):
def register(self, notify:bool=False, password:str=None):
self.generate_id()
try: users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
users = User.query.all()
for user in users:
if user.get_username() == self.get_username(): return False, f'Username {self.get_username()} already in use.'
if user.get_email() == self.get_email(): return False, f'Email address {self.get_email()} already in use.'
@ -100,7 +97,8 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p>
"""
)
try: mail.send(email)
try:
mail.send(email)
except SMTPException as exception:
write('system.log', f'SMTP Error while trying to notify new user account creation to {self.get_username()} with error: {exception}')
return True, f'User {self.get_username()} was created successfully.'
@ -153,12 +151,14 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p>
"""
)
try: mail.send(email)
try:
mail.send(email)
except SMTPException as exception:
write('system.log', f'SMTP Error while trying to reset password for {self.get_username()} with error: {exception}')
db.session.rollback()
return jsonify({'error': f'SMTP Error: {exception}'}), 500
try: db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when resetting password for user {self.get_username()}: {exception}')
@ -167,7 +167,8 @@ class User(UserMixin, db.Model):
def clear_reset_tokens(self):
self.reset_token = self.verification_token = None
try: db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when resetting clearing reset tokens for user {self.get_username()}: {exception}')
@ -207,7 +208,8 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p>
"""
)
try: mail.send(email)
try:
mail.send(email)
except SMTPException as exception:
write('system.log', f'SMTP Error when trying to delete account {username} with error: {exception}')
return True, message
@ -217,14 +219,11 @@ class User(UserMixin, db.Model):
if password: self.set_password(password)
old_email = self.get_email()
if email:
try:
for entry in User.query.all():
if entry.get_email() == email and not entry == self: return False, f'The email address {email} is already in use.'
except SQLAlchemyError as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
self.set_email(email)
try: db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when updating user {self.get_username()}: {exception}')
@ -258,7 +257,8 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p>
"""
)
try: mail.send(message)
try:
mail.send(message)
except SMTPException as exception:
write('system.log', f'SMTP Error when trying to update account {self.get_username()} with error: {exception}')
return True, f'Account {self.get_username()} has been updated.'

View File

@ -1,12 +1,10 @@
from ..forms.quiz import StartQuiz
from ..models import Entry, Test
from ..tools.forms import send_errors_to_client
from ..tools.logs import write
from ..tools.test import redirect_if_started
from flask import Blueprint, jsonify, render_template, request, session
from flask.helpers import abort, flash, redirect, url_for
from sqlalchemy.exc import SQLAlchemyError
from flask import abort, Blueprint, jsonify, redirect, render_template, request, session
from flask.helpers import flash, url_for
from datetime import datetime
@ -39,10 +37,7 @@ def _start():
entry.set_club(request.form.get('club'))
entry.set_email(request.form.get('email'))
code = request.form.get('test_code').replace('', '').lower()
try: test = Test.query.filter_by(code=code).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
test = Test.query.filter_by(code=code).first()
entry.test = test
entry.user_code = request.form.get('user_code')
entry.user_code = None if entry.user_code == '' else entry.user_code.lower()
@ -64,23 +59,16 @@ def _start():
@quiz.route('/quiz/')
def _quiz():
id = session.get('id')
try:
if not id or not Entry.query.filter_by(id=id).first():
flash('Your session was not recognised. Please sign in to the quiz again.', 'error')
session.pop('id', None)
return redirect(url_for('quiz._start'))
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/quiz/client.html')
@quiz.route('/result/')
def _result():
id = session.get('id')
try: entry = Entry.query.filter_by(id=id).first()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
entry = Entry.query.filter_by(id=id).first()
if not entry: return abort(404)
session.pop('id',None)
score = round(100*entry.result['score']/entry.result['max'])

View File

@ -1,23 +1,18 @@
from .data import load
from ..models import User
from ..tools.logs import write
from flask.helpers import abort, flash, redirect, url_for
from flask import abort, redirect
from flask.helpers import flash, url_for
from flask_login import current_user
from sqlalchemy.exc import SQLAlchemyError
from functools import wraps
def require_account_creation(function):
@wraps(function)
def wrapper(*args, **kwargs):
try:
if User.query.count() == 0:
flash('Please register a user account.', 'alert')
return redirect(url_for('admin._register'))
except SQLAlchemyError as exception:
write('system.log', f'Database error when checking for existing accounts: {exception}')
return abort(500)
return function(*args, **kwargs)
return wrapper

View File

@ -1,9 +1,8 @@
from ..models import Dataset
from ..tools.logs import write
from flask import current_app as app
from flask.helpers import abort, flash, redirect, url_for
from sqlalchemy.exc import SQLAlchemyError
from flask import flash, redirect
from flask.helpers import url_for
import json
from pathlib import Path
@ -77,10 +76,7 @@ def get_tag_list(dataset:list):
def check_dataset_exists(function):
@wraps(function)
def wrapper(*args, **kwargs):
try: datasets = Dataset.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when checking existing datasets: {exception}')
return abort(500)
datasets = Dataset.query.all()
if not datasets:
flash('There are no available question datasets. Please upload a question dataset first, or use the question editor to create a new dataset.', 'error')
return redirect(url_for('admin._questions'))

View File

@ -1,9 +1,7 @@
from ..extensions import db
from ..tools.logs import write
from flask import jsonify
from sqlalchemy.exc import SQLAlchemyError
from wtforms.validators import ValidationError
import json
@ -49,10 +47,7 @@ def get_time_options():
def get_dataset_choices():
from ..models import Dataset
try: datasets = Dataset.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when fetching dataset lists: {exception}')
return []
datasets = Dataset.query.all()
dataset_choices = []
for dataset in datasets:
label = dataset.get_name()

View File

@ -1,10 +1,8 @@
from .data import randomise_list
from ..models import Entry
from ..tools.logs import write
from flask import request, session
from flask.helpers import abort, redirect, url_for
from sqlalchemy.exc import SQLAlchemyError
from flask import redirect, request, session
from flask.helpers import url_for
from functools import wraps
@ -131,11 +129,8 @@ def redirect_if_started(function):
@wraps(function)
def wrapper(*args, **kwargs):
id = session.get('id')
try:
if request.method == 'GET' and id and Entry.query.filter_by(id=id).first(): return redirect(url_for('quiz._quiz'))
except SQLAlchemyError as exception:
write('system.log', f'Database error when checking if test has been started: {exception}')
return abort(500)
if request.method == 'GET' and id and Entry.query.filter_by(id=id).first():
return redirect(url_for('quiz._quiz'))
return function(*args, **kwargs)
return wrapper

View File

@ -2,12 +2,10 @@ from ..forms.admin import EditDataset
from ..models import Dataset, User
from ..tools.forms import get_dataset_choices, send_errors_to_client
from ..tools.data import check_dataset_exists
from ..tools.logs import write
from flask import Blueprint, jsonify, render_template, request
from flask.helpers import abort, flash, redirect, url_for
from flask import Blueprint, flash, jsonify, redirect, render_template, request
from flask.helpers import url_for
from flask_login import login_required
from sqlalchemy.exc import SQLAlchemyError
view = Blueprint(
name='view',
@ -34,13 +32,9 @@ def _view():
@login_required
@check_dataset_exists
def _view_console(id:str=None):
try:
dataset = Dataset.query.filter_by(id=id).first()
datasets = Dataset.query.count()
users = User.query.all()
except SQLAlchemyError as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset:
flash('Invalid dataset ID.', 'error')
return redirect(url_for('admin._questions'))

View File

@ -18,7 +18,6 @@ itsdangerous==2.1.2
Jinja2==3.1.2
MarkupSafe==2.1.1
pycparser==2.21
PyMySQL==1.0.2
python-dotenv==0.20.0
six==1.16.0
SQLAlchemy==1.4.40

View File

@ -8,7 +8,8 @@ import sys
from getpass import getpass
with app.app_context():
try: users = User.query.all()
try:
users = User.query.all()
except SQLAlchemyError as exception:
sys.exit('Database error:', exception)
print('')