Compare commits

...

29 Commits

Author SHA1 Message Date
ddfd75c1f8 Added selecting database to Readme 2022-08-20 17:46:45 +01:00
f4642767ac Tweaking formatting of docker-compose file 2022-08-20 17:28:45 +01:00
2f729de40b mysql compose 2022-08-20 17:25:07 +01:00
d68beb938f Tweaking docker-compose 2022-08-20 17:21:21 +01:00
ca667f7896 Create database before first request 2022-08-20 16:51:13 +01:00
0cc00ef911 Updated install script to only create SQLite file 2022-08-20 16:50:34 +01:00
5ec2a86d08 Added certbot directory for nginx to serve renewal 2022-08-20 15:46:19 +01:00
cd57eca7d3 Restructure install script 2022-08-20 15:40:41 +01:00
a46338fdcb Update gitignore and dockerignore 2022-08-20 15:39:50 +01:00
40f1cebb7b Unsaved files 2022-08-20 14:58:31 +01:00
2a6478f3cf Clean up unnecessary exception imports 2022-08-20 14:53:49 +01:00
b6e250a7cd Generate random root password for MySQL 2022-08-20 14:48:56 +01:00
bcee2eedd0 Generalise exception handling 2022-08-20 14:47:46 +01:00
d9837246de Updated SQL Json support 2022-08-20 13:01:32 +01:00
62fac48904 Making logs accessible from install root 2022-08-20 13:00:09 +01:00
2bf0eeb33d Bugfix: variable definition for different actions 2022-08-20 12:59:26 +01:00
72f2af1df8 Include connection errors in exception handling 2022-08-20 12:58:47 +01:00
168b2b288a Added mysql-related database variables
Added options for different database engines
2022-08-20 12:01:08 +01:00
9a5f69f889 Added database-related env variables 2022-08-20 11:59:33 +01:00
7d6f256392 Added PyMySQL driver dependency 2022-08-20 11:59:02 +01:00
866c9b10cf Exception handling for database queries 2022-08-20 10:56:43 +01:00
b8fd65d856 Added command line password reset tool. 2022-08-19 15:29:27 +01:00
5490bd083f Make reset script executable during image creation 2022-08-19 15:28:27 +01:00
3cb78055ff Added check for password reset from command line 2022-08-19 15:28:05 +01:00
f9d85a8028 Updated .env variable for future Flask versions
FLASK_ENV has been deprecated
2022-08-19 15:27:25 +01:00
4f193e7fa5 Corrected password length prompt 2022-08-19 15:26:51 +01:00
df3149abba Exception to cookie consent check for view/static 2022-08-19 13:29:29 +01:00
7ab87c2966 Exception handling for database commit operations 2022-08-19 13:25:20 +01:00
f4f501def5 Deleted redundant line 2022-08-19 13:24:54 +01:00
28 changed files with 576 additions and 192 deletions

View File

@ -1,10 +1,21 @@
SERVER_NAME= # URL where this will be hosted.
FLASK_DEBUG=False
TZ=Europe/London # Time Zone
## Flask Configuration
## App Configuration
SECRET_KEY= # Long, secure, secret string.
DATA=./data/
DATABASE_TYPE=SQLite # SQLite or MySQL, defaults to SQLite
DATABASE_HOST= # Required if MySQL. Must match name of Docker service, or provide host if database is on an external server. Defaults to localhost.
DATABASE_PORT= # Required if MySQL. Defaults to 3306
## MySQL Database Configuration (Required if configured to MySQL Database.)
# Note that if using the Docker service, these configuration values will also be used when creating the database in the mysql container.
MYSQL_RANDOM_ROOT_PASSWORD=True
MYSQL_DATABASE= # Required if MySQL.
MYSQL_USER= # Required if MySQL
MYSQL_PASSWORD= # Required if MySQL. Create secure password string. Note '@' character cannot be used.
## Flask Mail Configuration
MAIL_SERVER=postfix # Must match name of the Docker service

10
.gitignore vendored
View File

@ -153,3 +153,13 @@ database/data/
# Ignore Data Dir
**/data/*
# Ignore Logs Dir
logs/*
# Ignore Certbot Dir
certbot/*
# Ignore src dir (exception for robots.txt)
src/html/*
src/html/robots.txt

View File

@ -57,6 +57,16 @@ Once in the destination folder, clone all the relevant files you will need for t
(Remember to include the trailing dot at the end, as that indicates to Git to download the files in the current directory.)
#### Choose What Database Engine You Will Use
This app is designed to use an SQLite database by default.
You can set it up to use a MySQL database by configuring the environment variables accordingly.
If your database is being hosted remotely, make sure the MySQL database has the proper authentication for the user from a remote server.
Alternatively, you can also use the second `docker-compose-mysql.yml` file which provides a MySQL database as part of the cluster.
To use the second `docker-compose-mysql.yml` file, use the following command at the last step of the installation:
```sudo docker compose -f docker-compose-mysql.yml up```
#### Populate Environment Variables
Configuration values for the app are stored in the environment variables file.

2
certbot/.gitignore vendored
View File

@ -1,2 +0,0 @@
*
!.gitignore

90
docker-compose-mysql.yml Normal file
View File

@ -0,0 +1,90 @@
version: '3.9'
volumes:
app:
mysql:
services:
nginx:
container_name: reftest_server
image: nginx:alpine
volumes:
- ./certbot:/etc/letsencrypt:ro
- ./nginx:/etc/nginx
- ./src/html/certbot:/usr/share/nginx/html/certbot:ro
- ./src/html/robots.txt:/usr/share/nginx/html/robots.txt:ro
- ./ref-test/app/root:/usr/share/nginx/html/root:ro
- ./ref-test/app/admin/static:/usr/share/nginx/html/admin/static:ro
- ./ref-test/app/editor/static:/usr/share/nginx/html/editor/static:ro
- ./ref-test/app/quiz/static:/usr/share/nginx/html/quiz/static:ro
- ./ref-test/app/view/static:/usr/share/nginx/html/view/static:ro
ports:
- 80:80
- 443:443
restart: unless-stopped
networks:
- frontend
depends_on:
app:
command: "/bin/sh -c 'while :; do sleep 6h & wait $${!}; nginx -s reload; done & nginx -g \"daemon off;\"'"
app:
container_name: reftest_app
image: reftest
build: ./ref-test
env_file:
- ./.env
ports:
- 5000
volumes:
- app:/ref-test/data
- ./logs:/ref-test/data/logs
restart: unless-stopped
networks:
- frontend
- backend
depends_on:
postfix:
mysql:
condition: service_healthy
postfix:
container_name: reftest_postfix
image: catatnight/postfix:latest
restart: unless-stopped
env_file:
- ./.env
ports:
- 25
networks:
- backend
certbot:
container_name: reftest_certbot
image: certbot/certbot
volumes:
- ./certbot:/etc/letsencrypt
- ./src/html/certbot:/var/www/html
entrypoint: "/bin/sh -c 'trap exit TERM; while :; do certbot renew; sleep 12h & wait $${!}; done;'"
mysql:
container_name: reftest_db
image: mysql:8.0
env_file:
- ./.env
volumes:
- mysql:/var/lib/mysql
ports:
- 3306
networks:
- backend
healthcheck:
test: ["CMD", "mysqladmin" ,"ping", "-h", "localhost"]
timeout: 10s
retries: 10
networks:
frontend:
external: false
backend:
external: false

View File

@ -1,7 +1,7 @@
version: '3.9'
volumes:
data:
app:
services:
nginx:
@ -10,6 +10,7 @@ services:
volumes:
- ./certbot:/etc/letsencrypt:ro
- ./nginx:/etc/nginx
- ./src/html/certbot:/usr/share/nginx/html/certbot:ro
- ./src/html/robots.txt:/usr/share/nginx/html/robots.txt:ro
- ./ref-test/app/root:/usr/share/nginx/html/root:ro
- ./ref-test/app/admin/static:/usr/share/nginx/html/admin/static:ro
@ -23,7 +24,7 @@ services:
networks:
- frontend
depends_on:
- app
app:
command: "/bin/sh -c 'while :; do sleep 6h & wait $${!}; nginx -s reload; done & nginx -g \"daemon off;\"'"
app:
@ -35,13 +36,14 @@ services:
ports:
- 5000
volumes:
- data:/ref-test/data
- app:/ref-test/data
- ./logs:/ref-test/data/logs
restart: unless-stopped
networks:
- frontend
- backend
depends_on:
- postfix
postfix:
postfix:
container_name: reftest_postfix
@ -59,7 +61,7 @@ services:
image: certbot/certbot
volumes:
- ./certbot:/etc/letsencrypt
- ./src/html:/var/www/html
- ./src/html/certbot:/var/www/html
entrypoint: "/bin/sh -c 'trap exit TERM; while :; do certbot renew; sleep 12h & wait $${!}; done;'"
networks:

View File

@ -1,6 +1,6 @@
# Certbot Renewal
location ^~ /.well-known/acme-challenge/ {
root /usr/share/nginx/html;
root /usr/share/nginx/html/certbot;
allow all;
default_type "text/plain";
}

View File

@ -1,2 +1,3 @@
env/
__pycache__/
data/

View File

@ -4,5 +4,5 @@ ENV DATA=$DATA
WORKDIR /ref-test
COPY . .
RUN pip install --upgrade pip && pip install -r requirements.txt
RUN chmod +x install.py && ./install.py
RUN chmod +x install.py reset.py && ./install.py
CMD [ "gunicorn", "-b", "0.0.0.0:5000", "-w", "5", "wsgi:app" ]

View File

@ -1,9 +1,10 @@
from .config import Production as Config
from .models import User
from .models import *
from .extensions import bootstrap, csrf, db, login_manager, mail
from .tools.logs import write
from flask import flash, Flask, render_template, request
from flask.helpers import url_for
from flask.helpers import abort, url_for
from flask.json import jsonify
from flask_wtf.csrf import CSRFError
from werkzeug.middleware.proxy_fix import ProxyFix
@ -24,13 +25,16 @@ def create_app():
login_manager.login_view = 'admin._login'
@login_manager.user_loader
def _load_user(id):
return User.query.filter_by(id=id).first()
try: return User.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when loading user fo login manager: {exception}')
return abort(500)
@app.before_request
def _check_cookie_consent():
if request.cookies.get('cookie_consent'):
return
if any([ request.path.startswith(x) for x in [ '/admin/static/', '/root/', '/quiz/static', '/cookies/', '/admin/editor/static' ] ]):
if any([ request.path.startswith(x) for x in [ '/admin/static/', '/root/', '/quiz/static', '/cookies/', '/admin/editor/static', '/admin/view/static' ] ]):
return
flash(f'<strong>Cookie Consent</strong>: This web site only stores minimal, functional cookies. It does not store any tracking information. By using this site, you consent to this use of cookies. For more information, see our <a href="{url_for("views._privacy")}">privacy policy</a>.', 'cookie_alert')
@ -55,4 +59,10 @@ def create_app():
app.register_blueprint(editor, url_prefix='/admin/editor')
app.register_blueprint(view, url_prefix='/admin/view')
"""Create Database Tables before First Request"""
@app.before_first_request
def _create_database_tables():
with app.app_context():
db.create_all()
return app

View File

@ -1,12 +1,13 @@
from ..forms.admin import AddTimeAdjustment, CreateTest, CreateUser, DeleteUser, Login, Register, ResetPassword, UpdatePassword, UpdateUser, UploadData
from ..models import Dataset, Entry, Test, User
from ..tools.auth import disable_if_logged_in, require_account_creation
from ..tools.forms import get_dataset_choices, get_time_options, send_errors_to_client
from ..tools.data import check_dataset_exists, check_is_json, validate_json
from ..tools.forms import get_dataset_choices, get_time_options, send_errors_to_client
from ..tools.logs import write
from ..tools.test import answer_options, get_correct_answers
from flask import abort, Blueprint, jsonify, render_template, redirect, request, send_file, session
from flask.helpers import flash, url_for
from flask import abort, Blueprint, jsonify, render_template, request, send_file, session
from flask.helpers import abort, flash, redirect, url_for
from flask_login import current_user, login_required
from datetime import date, datetime, timedelta
@ -26,8 +27,12 @@ admin = Blueprint(
@admin.route('/dashboard/')
@login_required
def _home():
try:
tests = Test.query.all()
results = Entry.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
current_tests = [ test for test in tests if test.end_date >= datetime.now() and test.start_date.date() <= date.today() ]
current_tests.sort(key= lambda x: x.end_date, reverse=True)
upcoming_tests = [ test for test in tests if test.start_date.date() > datetime.now().date()]
@ -39,8 +44,12 @@ def _home():
@admin.route('/settings/')
@login_required
def _settings():
try:
users = User.query.all()
datasets = Dataset.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/admin/settings/index.html', users=users, datasets=datasets)
@admin.route('/login/', methods=['GET','POST'])
@ -50,7 +59,10 @@ def _login():
form = Login()
if request.method == 'POST':
if form.validate_on_submit():
users = User.query.all()
try: users = User.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
user = None
for _user in users:
if _user.get_username() == request.form.get('username').lower():
@ -99,7 +111,10 @@ def _reset():
if request.method == 'POST':
if form.validate_on_submit():
user = None
users = User.query.all()
try: users = User.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
for _user in users:
if _user.get_username() == request.form.get('username'):
user = _user
@ -111,7 +126,10 @@ def _reset():
token = request.args.get('token')
if token:
user = User.query.filter_by(reset_token=token).first()
try: user = User.query.filter_by(reset_token=token).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not user: return redirect(url_for('admin._reset'))
verification_token = user.verification_token
user.clear_reset_tokens()
@ -128,7 +146,10 @@ def _update_password():
form = UpdatePassword()
if form.validate_on_submit():
user = session.pop('user')
user = User.query.filter_by(id=user).first()
try: user = User.query.filter_by(id=user).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
user.update(password=request.form.get('password'))
session['remembered_username'] = user.get_username()
flash('Your password has been reset.', 'success')
@ -139,7 +160,10 @@ def _update_password():
@login_required
def _users():
form = CreateUser()
users = User.query.all()
try: users = User.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if request.method == 'POST':
if form.validate_on_submit():
password = request.form.get('password')
@ -156,7 +180,10 @@ def _users():
@admin.route('/settings/users/delete/<string:id>', methods=['GET', 'POST'])
@login_required
def _delete_user(id:str):
user = User.query.filter_by(id=id).first()
try: user = User.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
form = DeleteUser()
if request.method == 'POST':
if not user: return jsonify({'error': 'User does not exist.'}), 400
@ -180,7 +207,10 @@ def _delete_user(id:str):
@admin.route('/settings/users/update/<string:id>', methods=['GET', 'POST'])
@login_required
def _update_user(id:str):
user = User.query.filter_by(id=id).first()
try: user = User.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
form = UpdateUser()
if request.method == 'POST':
if not user: return jsonify({'error': 'User does not exist.'}), 400
@ -222,7 +252,10 @@ def _questions():
return jsonify({'error': message}), 400
return send_errors_to_client(form=form)
data = Dataset.query.all()
try: data = Dataset.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/admin/settings/questions.html', form=form, data=data)
@admin.route('/settings/questions/delete/', methods=['POST'])
@ -231,7 +264,10 @@ def _edit_questions():
id = request.get_json()['id']
action = request.get_json()['action']
if not action == 'delete': return jsonify({'error': 'Invalid action.'}), 400
dataset = Dataset.query.filter_by(id=id).first()
try: dataset = Dataset.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if action == 'delete': success, message = dataset.delete()
if success: return jsonify({'success': message}), 200
return jsonify({'error': message}), 400
@ -239,7 +275,10 @@ def _edit_questions():
@admin.route('/settings/questions/download/<string:id>/')
@login_required
def _download(id:str):
dataset = Dataset.query.filter_by(id=id).first()
try: dataset = Dataset.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset: return abort(404)
data_path = path.abspath(dataset.get_file())
return send_file(data_path, as_attachment=True, attachment_filename=f'{dataset.get_name()}.json')
@ -250,7 +289,10 @@ def _download(id:str):
@check_dataset_exists
def _tests(filter:str=None):
tests = None
_tests = Test.query.all()
try: _tests = Test.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
form = None
now = datetime.now()
if filter not in ['create','active','scheduled','expired','all']: return redirect(url_for('admin._tests', filter='active'))
@ -296,7 +338,10 @@ def _create_test():
new_test.end_date = datetime.strptime(new_test.end_date, '%Y-%m-%dT%H:%M')
new_test.time_limit = None if request.form.get('time_limit') == 'none' else int(request.form.get('time_limit'))
dataset = request.form.get('dataset')
new_test.dataset = Dataset.query.filter_by(id=dataset).first()
try: new_test.dataset = Dataset.query.filter_by(id=dataset).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
success, message = new_test.create()
if success:
flash(message=message, category='success')
@ -310,7 +355,10 @@ def _edit_test():
id = request.get_json()['id']
action = request.get_json()['action']
if action not in ['start', 'delete', 'end']: return jsonify({'error': 'Invalid action.'}), 400
test = Test.query.filter_by(id=id).first()
try: test = Test.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not test: return jsonify({'error': 'Could not find the corresponding test to delete.'}), 404
if action == 'delete': success, message = test.delete()
if action == 'start': success, message = test.start()
@ -324,7 +372,10 @@ def _edit_test():
@login_required
def _view_test(id:str=None):
form = AddTimeAdjustment()
test = Test.query.filter_by(id=id).first()
try: test = Test.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if request.method == 'POST':
if not test: return jsonify({'error': 'Invalid test ID.'}), 404
if form.validate_on_submit():
@ -341,7 +392,10 @@ def _view_test(id:str=None):
@admin.route('/test/<string:id>/delete-adjustment/', methods=['POST'])
@login_required
def _delete_adjustment(id:str=None):
test = Test.query.filter_by(id=id).first()
try: test = Test.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not test: return jsonify({'error': 'Invalid test ID.'}), 404
user_code = request.get_json()['user_code'].lower()
success, message = test.remove_adjustment(user_code)
@ -351,13 +405,19 @@ def _delete_adjustment(id:str=None):
@admin.route('/results/')
@login_required
def _view_entries():
entries = Entry.query.all()
try: entries = Entry.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/admin/results.html', entries = entries)
@admin.route('/results/<string:id>/', methods = ['GET', 'POST'])
@login_required
def _view_entry(id:str=None):
entry = Entry.query.filter_by(id=id).first()
try: entry = Entry.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if request.method == 'POST':
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404
action = request.get_json()['action']
@ -388,6 +448,9 @@ def _view_entry(id:str=None):
def _generate_certificate():
from ..extensions import db
id = request.get_json()['id']
entry = Entry.query.filter_by(id=id).first()
try: entry = Entry.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404
return render_template('/admin/components/certificate.html', entry = entry)

View File

@ -1,8 +1,10 @@
from ..models import Dataset, Entry, User
from ..tools.data import validate_json
from ..tools.logs import write
from ..tools.test import evaluate_answers, generate_questions
from flask import Blueprint, flash, jsonify, request, url_for
from flask import Blueprint, jsonify, request
from flask.helpers import abort, flash, url_for
from flask_login import login_required
from datetime import datetime, timedelta
@ -16,7 +18,10 @@ api = Blueprint(
@api.route('/questions/', methods=['POST'])
def _fetch_questions():
id = request.get_json()['id']
entry = Entry.query.filter_by(id=id).first()
try: entry = Entry.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 400
test = entry.test
user_code = entry.user_code
@ -50,7 +55,10 @@ def _fetch_questions():
def _submit_quiz():
id = request.get_json()['id']
answers = request.get_json()['answers']
entry = Entry.query.filter_by(id=id).first()
try: entry = Entry.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not entry: return jsonify({'error': 'Unrecognised Entry.'}), 400
test = entry.test
dataset = test.dataset
@ -71,7 +79,10 @@ def _submit_quiz():
def _editor(id:str=None):
request_data = request.get_json()
id = request_data['id']
dataset = Dataset.query.filter_by(id=id).first()
try: dataset = Dataset.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset: return jsonify({'error': 'Invalid request. Dataset not found.'}), 404
data_path = dataset.get_file()
if request_data['action'] == 'fetch':
@ -80,10 +91,13 @@ def _editor(id:str=None):
return jsonify({'success': 'Successfully downloaded dataset', 'data': data}), 200
default = request_data['default']
creator = request_data['creator']
try: user = User.query.filter_by(id=creator).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
name = request_data['name']
data = request_data['data']
if not validate_json(data): return jsonify({'error': 'The data you submitted was invalid.'}), 400
user = User.query.filter_by(id=creator).first()
dataset.set_name(name)
dataset.creator = user
success, message = dataset.update(data=data, default=default)

View File

@ -4,6 +4,7 @@ from dotenv import load_dotenv
load_dotenv('../.env')
class Config(object):
"""Basic App Configuration"""
APP_HOST = '0.0.0.0'
DATA = './data/'
DEBUG = False
@ -11,9 +12,8 @@ class Config(object):
SECRET_KEY = os.getenv('SECRET_KEY')
SERVER_NAME = os.getenv('SERVER_NAME')
SESSION_COOKIE_SECURE = True
SQLALCHEMY_DATABASE_URI = f'sqlite:///{Path(os.path.abspath(f"{DATA}/database.db"))}'
SQLALCHEMY_TRACK_MODIFICATIONS = False
"""Email Engine Configuration"""
MAIL_SERVER = os.getenv('MAIL_SERVER')
MAIL_PORT = int(os.getenv('MAIL_PORT') or 25)
MAIL_USE_TLS = False
@ -26,6 +26,19 @@ class Config(object):
MAIL_SUPPRESS_SEND = False
MAIL_ASCII_ATTACHMENTS = bool(os.getenv('MAIL_ASCII_ATTACHMENTS') or True)
"""Database Driver Configuration"""
DATABASE_TYPE = os.getenv('DATABASE_TYPE') or 'SQLite'
SQLALCHEMY_TRACK_MODIFICATIONS = False
if DATABASE_TYPE.lower() == 'mysql' and os.getenv('MYSQL_DATABASE') and os.getenv('MYSQL_USER') and os.getenv('MYSQL_PASSWORD'):
DATABASE_HOST = os.getenv('DATABASE_HOST') or 'localhost'
DATABASE_PORT = int(os.getenv('DATABASE_PORT') or 3306)
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE')
MYSQL_USER = os.getenv('MYSQL_USER')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD')
SQLALCHEMY_DATABASE_URI = f'mysql+pymysql://{MYSQL_USER}:{MYSQL_PASSWORD}@{DATABASE_HOST}:{DATABASE_PORT}/{MYSQL_DATABASE}'
else: SQLALCHEMY_DATABASE_URI = f'sqlite:///{Path(os.path.abspath(f"{DATA}/database.db"))}'
class Production(Config):
pass

View File

@ -1,10 +1,11 @@
from ..forms.admin import EditDataset
from ..models import Dataset, User
from ..tools.forms import get_dataset_choices, send_errors_to_client
from ..tools.data import check_dataset_exists
from ..tools.forms import get_dataset_choices, send_errors_to_client
from ..tools.logs import write
from flask import Blueprint, flash, jsonify, redirect, render_template, request
from flask.helpers import url_for
from flask import Blueprint, jsonify, render_template
from flask.helpers import abort, flash, redirect, request, url_for
from flask_login import login_required
editor = Blueprint(
@ -31,9 +32,13 @@ def _editor():
@check_dataset_exists
@login_required
def _editor_console(id:str=None):
try:
dataset = Dataset.query.filter_by(id=id).first()
datasets = Dataset.query.count()
users = User.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset:
flash('Invalid dataset ID.', 'error')
return redirect(url_for('admin._questions'))

View File

@ -8,44 +8,44 @@ from wtforms.validators import InputRequired, Email, EqualTo, Length, Optional
class Login(FlaskForm):
username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=20, message='The password must be between 6 and 20 characters long.')])
remember = BooleanField('Remember Log In', render_kw={'checked': True})
class Register(FlaskForm):
username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
email = StringField('Email Address', validators=[InputRequired(), Email(message='You must enter a valid email address.'), Length(max=50)])
password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.'), EqualTo('password', message='Passwords do not match.')])
password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=20, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter Password', validators=[InputRequired(), Length(min=6, max=20, message='The password must be between 6 and 20 characters long.'), EqualTo('password', message='Passwords do not match.')])
class ResetPassword(FlaskForm):
username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
email = StringField('Email Address', validators=[InputRequired(), Email(message='You must enter a valid email address.'), Length(max=50)])
class UpdatePassword(FlaskForm):
password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.'), EqualTo('password', message='Passwords do not match.')])
password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=20, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter Password', validators=[InputRequired(), Length(min=6, max=20, message='The password must be between 6 and 20 characters long.'), EqualTo('password', message='Passwords do not match.')])
class CreateUser(FlaskForm):
username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
email = StringField('Email Address', validators=[InputRequired(), Email(message='You must enter a valid email address.'), Length(max=50)])
password = PasswordField('Password (Optional)', validators=[Optional(),Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password = PasswordField('Password (Optional)', validators=[Optional(),Length(min=6, max=20, message='The password must be between 6 and 20 characters long.')])
notify = BooleanField('Notify accout creation by email', render_kw={'checked': True})
class DeleteUser(FlaskForm):
password = PasswordField('Confirm Your Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password = PasswordField('Confirm Your Password', validators=[InputRequired(), Length(min=6, max=20, message='The password must be between 6 and 20 characters long.')])
notify = BooleanField('Notify deletion by email', render_kw={'checked': True})
class UpdateUser(FlaskForm):
confirm_password = PasswordField('Confirm Your Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
confirm_password = PasswordField('Confirm Your Password', validators=[InputRequired(), Length(min=6, max=20, message='The password must be between 6 and 20 characters long.')])
email = StringField('Email Address', validators=[Optional(), Email(message='You must enter a valid email address.'), Length(max=50)])
password = PasswordField('Change Password', validators=[Optional(),Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password = PasswordField('Change Password', validators=[Optional(),Length(min=6, max=20, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter New Password', validators=[EqualTo('password', message='Passwords do not match.')])
notify = BooleanField('Notify changes by email', render_kw={'checked': True})
class UpdateAccount(FlaskForm):
confirm_password = PasswordField('Current Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
confirm_password = PasswordField('Current Password', validators=[InputRequired(), Length(min=6, max=20, message='The password must be between 6 and 20 characters long.')])
email = StringField('Email Address', validators=[Optional(), Email(message='You must enter a valid email address.'), Length(max=50)])
password = PasswordField('Change Password', validators=[Optional(),Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password = PasswordField('Change Password', validators=[Optional(),Length(min=6, max=20, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter New Password', validators=[EqualTo('password', message='Passwords do not match.')])
class CreateTest(FlaskForm):

View File

@ -2,8 +2,8 @@ from ..extensions import db
from ..tools.encryption import decrypt, encrypt
from ..tools.logs import write
from flask import flash
from flask import current_app as app
from flask.helpers import flash
from flask_login import current_user
from werkzeug.utils import secure_filename
@ -42,10 +42,17 @@ class Dataset(db.Model):
def get_name(self): return decrypt(self.name)
def make_default(self):
for dataset in Dataset.query.all():
dataset.default = False
try:
for dataset in Dataset.query.all(): dataset.default = False
except Exception as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
self.default = True
db.session.commit()
try: db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
write('system.log', f'Dataset {self.id} set as default by {current_user.get_username()}.')
flash(message='Dataset set as default.', category='success')
return True, f'Dataset set as default.'
@ -55,17 +62,26 @@ class Dataset(db.Model):
message = 'Cannot delete the default dataset.'
flash(message, 'error')
return False, message
try:
if Dataset.query.count() == 1:
message = 'Cannot delete the only dataset.'
flash(message, 'error')
return False, message
except Exception as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
write('system.log', f'Dataset {self.id} deleted by {current_user.get_username()}.')
filename = secure_filename('.'.join([self.id,'json']))
data = Path(app.config.get('DATA'))
file_path = path.join(data, 'questions', filename)
remove(file_path)
try:
db.session.delete(self)
db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when trying to delete dataset {self.id}: {exception}')
return False, f'Database error: {exception}'
remove(file_path)
return True, 'Dataset deleted.'
def create(self, data:list, default:bool=False):
@ -78,8 +94,13 @@ class Dataset(db.Model):
self.creator = current_user
if default: self.make_default()
write('system.log', f'New dataset {self.get_name()} added by {current_user.get_username()}.')
try:
db.session.add(self)
db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when trying to crreate dataset {self.id}: {exception}')
return False, f'Database error: {exception}'
return True, 'Dataset created.'
def check_file(self):
@ -103,6 +124,11 @@ class Dataset(db.Model):
dump(data, file, indent=2)
write('system.log', f'Dataset {self.id} edited by {current_user.get_username()}.')
flash(f'Dataset {self.get_name()} successfully edited.', 'success')
try:
db.session.add(self)
db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when trying to update dataset {self.id}: {exception}')
return False, f'Database error: {exception}'
return True, 'Dataset successfully edited.'

View File

@ -1,12 +1,11 @@
from ..extensions import db, mail
from ..tools.forms import JsonEncodedDict
from ..tools.encryption import decrypt, encrypt
from ..tools.logs import write
from .test import Test
from flask_login import current_user
from flask_mail import Message
from smtplib import SMTPException
from sqlalchemy_json import MutableJson
from datetime import datetime, timedelta
from uuid import uuid4
@ -24,8 +23,8 @@ class Entry(db.Model):
end_time = db.Column(db.DateTime, nullable=True)
status = db.Column(db.String(16), nullable=True)
valid = db.Column(db.Boolean, default=True, nullable=True)
answers = db.Column(JsonEncodedDict, nullable=True)
result = db.Column(JsonEncodedDict, nullable=True)
answers = db.Column(MutableJson, nullable=True)
result = db.Column(MutableJson, nullable=True)
def __repr__(self):
return f'<New entry by {self.first_name} {self.surname}> was added with <id {self.id}>.'
@ -70,23 +69,31 @@ class Entry(db.Model):
def ready(self):
self.generate_id()
try:
db.session.add(self)
db.session.commit()
write('tests.log', f'New test ready for {self.get_first_name()} {self.get_surname()}.')
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when preparing new entry for {self.get_surname()}, {self.get_first_name()}: {exception}')
return False, f'Database error: {exception}'
write('tests.log', f'New test ready for {self.get_surname()}, {self.get_first_name()} with id {self.id}.')
return True, f'Test ready.'
def start(self):
self.start_time = datetime.now()
self.status = 'started'
write('tests.log', f'Test started by {self.get_first_name()} {self.get_surname()}.')
db.session.commit()
try: db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when starting test for {self.get_surname()}, {self.get_first_name()}: {exception}')
return False, f'Database error: {exception}'
write('tests.log', f'Test started by {self.get_surname()}, {self.get_first_name()} with id {self.id}.')
return True, f'New test started with id {self.id}.'
def complete(self, answers:dict=None, result:dict=None):
self.end_time = datetime.now()
self.answers = answers
self.result = result
write('tests.log', f'Test completed by {self.get_first_name()} {self.get_surname()}.')
delta = timedelta(minutes=int(0 if self.test.time_limit is None else self.test.time_limit)+1)
if not self.test.time_limit or self.end_time <= self.start_time + delta:
self.status = 'completed'
@ -94,7 +101,12 @@ class Entry(db.Model):
else:
self.status = 'late'
self.valid = False
db.session.commit()
try: db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when submitting entry for {self.get_surname()}, {self.get_first_name()}: {exception}')
return False, f'Database error: {exception}'
write('tests.log', f'Test completed by {self.get_surname()}, {self.get_first_name()} with id {self.id}.')
return True, f'Test entry completed for id {self.id}.'
def validate(self):
@ -102,15 +114,24 @@ class Entry(db.Model):
if self.status == 'started': return False, 'The entry is still pending.'
self.valid = True
self.status = 'completed'
db.session.commit()
try: db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when validating entry {self.id}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'The entry {self.id} has been validated by {current_user.get_username()}.')
return True, f'The entry {self.id} has been validated.'
def delete(self):
id = self.id
name = f'{self.get_first_name()} {self.get_surname()}'
try:
db.session.delete(self)
db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when deleting entry {id}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'The entry {id} by {name} has been deleted by {current_user.get_username()}.')
return True, 'Entry deleted.'
@ -175,7 +196,5 @@ class Entry(db.Model):
<p>Best wishes, <br/> SKA Refereeing</p>
"""
)
try:
mail.send(email)
except SMTPException as exception:
write('system.log', f'SMTP Error when trying to notify results to {self.get_surname()}, {self.get_first_name()} with error: {exception}')
try: mail.send(email)
except Exception as exception: write('system.log', f'SMTP Error when trying to notify results to {self.get_surname()}, {self.get_first_name()} with error: {exception}')

View File

@ -1,8 +1,8 @@
from ..extensions import db
from ..tools.forms import JsonEncodedDict
from ..tools.logs import write
from flask_login import current_user
from sqlalchemy_json import MutableJson
from datetime import date, datetime
import secrets
@ -17,7 +17,7 @@ class Test(db.Model):
time_limit = db.Column(db.Integer, nullable=True)
creator_id = db.Column(db.String(36), db.ForeignKey('user.id'))
dataset_id = db.Column(db.String(36), db.ForeignKey('dataset.id'))
adjustments = db.Column(JsonEncodedDict, nullable=True)
adjustments = db.Column(MutableJson, nullable=True)
entries = db.relationship('Entry', backref='test')
def __repr__(self):
@ -52,16 +52,24 @@ class Test(db.Model):
errors.append('The expiry date cannot be before the start date.')
if errors:
return False, errors
try:
db.session.add(self)
db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when creating test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Test with code {self.get_code()} created by {current_user.get_username()}.')
return True, f'Test with code {self.get_code()} has been created.'
def delete(self):
code = self.code
if self.entries: return False, f'Cannot delete a test with submitted entries.'
db.session.delete(self)
db.session.commit()
try: db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when deleting test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Test with code {self.get_code()} has been deleted by {current_user.get_username()}.')
return True, f'Test with code {self.get_code()} has been deleted.'
@ -69,7 +77,11 @@ class Test(db.Model):
now = datetime.now()
if self.start_date.date() > now.date():
self.start_date = now
db.session.commit()
try: db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when launching test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Test with code {self.get_code()} has been started by {current_user.get_username()}.')
return True, f'Test with code {self.get_code()} has been started.'
return False, f'Test with code {self.get_code()} has already started.'
@ -78,7 +90,11 @@ class Test(db.Model):
now = datetime.now()
if self.end_date >= now:
self.end_date = now
db.session.commit()
try: db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when closing test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Test with code {self.get_code()} ended by {current_user.get_username()}.')
return True, f'Test with code {self.get_code()} has been ended.'
return False, f'Test with code {self.get_code()} has already ended.'
@ -88,7 +104,11 @@ class Test(db.Model):
code = secrets.token_hex(3).lower()
adjustments[code] = time
self.adjustments = adjustments
db.session.commit()
try: db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when adding adjustment to test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Time adjustment for {time} minutes with code {code} added to test {self.get_code()} by {current_user.get_username()}.')
return True, f'Time adjustment for {time} minutes added to test {self.get_code()}. This can be accessed using the user code {code.upper()}.'
@ -96,7 +116,11 @@ class Test(db.Model):
if not self.adjustments: return False, f'There are no adjustments configured for test {self.get_code()}.'
self.adjustments.pop(code)
if not self.adjustments: self.adjustments = None
db.session.commit()
try: db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when deleting adjustment from test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Time adjustment for with code {code} has been removed from test {self.get_code()} by {current_user.get_username()}.')
return True, f'Time adjustment for with code {code} has been removed from test {self.get_code()}.'
@ -105,6 +129,10 @@ class Test(db.Model):
if start_date: self.start_date = start_date
if end_date: self.end_date = end_date
if time_limit is not None: self.time_limit = time_limit
db.session.commit()
try: db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when updating test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Test with code {self.get_code()} has been updated by user {current_user.get_username()}.')
return True, f'Test with code {self.get_code()} has been updated by.'

View File

@ -2,11 +2,10 @@ from ..extensions import db, mail
from ..tools.encryption import decrypt, encrypt
from ..tools.logs import write
from flask import flash, jsonify, session
from flask.helpers import url_for
from flask import jsonify, session
from flask.helpers import flash, url_for
from flask_login import current_user, login_user, logout_user, UserMixin
from flask_mail import Message
from smtplib import SMTPException
from werkzeug.security import check_password_hash, generate_password_hash
import secrets
@ -56,13 +55,21 @@ class User(UserMixin, db.Model):
def register(self, notify:bool=False, password:str=None):
self.generate_id()
users = User.query.all()
try: users = User.query.all()
except Exception as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
for user in users:
if user.get_username() == self.get_username(): return False, f'Username {self.get_username()} already in use.'
if user.get_email() == self.get_email(): return False, f'Email address {self.get_email()} already in use.'
self.set_password(password=password)
try:
db.session.add(self)
db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when registering user {self.get_username()}: {exception}')
return False, f'Database error: {exception}'
write('users.log', f'User \'{self.get_username()}\' was created with id \'{self.id}\'.')
if notify:
email = Message(
@ -91,10 +98,8 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p>
"""
)
try:
mail.send(email)
except SMTPException as exception:
write('system.log', f'SMTP Error while trying to notify new user account creation to {self.get_username()} with error: {exception}')
try: mail.send(email)
except Exception as exception: write('system.log', f'SMTP Error while trying to notify new user account creation to {self.get_username()} with error: {exception}')
return True, f'User {self.get_username()} was created successfully.'
def login(self, remember:bool=False):
@ -145,23 +150,36 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p>
"""
)
try:
mail.send(email)
except SMTPException as exception:
try: mail.send(email)
except Exception as exception:
write('system.log', f'SMTP Error while trying to reset password for {self.get_username()} with error: {exception}')
db.session.rollback()
return jsonify({'error': f'SMTP Error: {exception}'}), 500
db.session.commit()
try: db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when resetting password for user {self.get_username()}: {exception}')
return False, f'Database error: {exception}'
return jsonify({'success': 'Your password reset link has been generated.'}), 200
def clear_reset_tokens(self):
self.reset_token = self.verification_token = None
db.session.commit()
try: db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when resetting clearing reset tokens for user {self.get_username()}: {exception}')
return False, f'Database error: {exception}'
def delete(self, notify:bool=False):
username = self.get_username()
email_address = self.get_email()
try:
db.session.delete(self)
db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when deleting user {self.get_username()}: {exception}')
return False, f'Database error: {exception}'
message = f'User \'{username}\' was deleted by \'{current_user.get_username()}\'.'
write('users.log', message)
if notify:
@ -186,10 +204,8 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p>
"""
)
try:
mail.send(email)
except SMTPException as exception:
write('system.log', f'SMTP Error when trying to delete account {username} with error: {exception}')
try: mail.send(email)
except Exception as exception: write('system.log', f'SMTP Error when trying to delete account {username} with error: {exception}')
return True, message
def update(self, password:str=None, email:str=None, notify:bool=False):
@ -197,11 +213,19 @@ class User(UserMixin, db.Model):
if password: self.set_password(password)
old_email = self.get_email()
if email:
try:
for entry in User.query.all():
if entry.get_email() == email and not entry == self: return False, f'The email address {email} is already in use.'
except Exception as exception:
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
self.set_email(email)
db.session.commit()
_current_user = current_user.get_username() if current_user.is_authenticated else 'anonymous'
try: db.session.commit()
except Exception as exception:
db.session.rollback()
write('system.log', f'Database error when updating user {self.get_username()}: {exception}')
return False, f'Database error: {exception}'
_current_user = 'command line' if not current_user else 'anonymous' if not current_user.is_authenticated else current_user.get_username()
write('system.log', f'Information for user {self.get_username()} has been updated by {_current_user}.')
if notify:
message = Message(
@ -230,8 +254,6 @@ class User(UserMixin, db.Model):
<p>SKA Refereeing</p>
"""
)
try:
mail.send(message)
except SMTPException as exception:
write('system.log', f'SMTP Error when trying to update account {self.get_username()} with error: {exception}')
try: mail.send(message)
except Exception as exception: write('system.log', f'SMTP Error when trying to update account {self.get_username()} with error: {exception}')
return True, f'Account {self.get_username()} has been updated.'

View File

@ -1,10 +1,11 @@
from ..forms.quiz import StartQuiz
from ..models import Entry, Test
from ..tools.forms import send_errors_to_client
from ..tools.logs import write
from ..tools.test import redirect_if_started
from flask import abort, Blueprint, jsonify, redirect, render_template, request, session
from flask.helpers import flash, url_for
from flask import Blueprint, jsonify, render_template, request, session
from flask.helpers import abort, flash, redirect, url_for
from datetime import datetime
@ -37,7 +38,10 @@ def _start():
entry.set_club(request.form.get('club'))
entry.set_email(request.form.get('email'))
code = request.form.get('test_code').replace('', '').lower()
test = Test.query.filter_by(code=code).first()
try: test = Test.query.filter_by(code=code).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
entry.test = test
entry.user_code = request.form.get('user_code')
entry.user_code = None if entry.user_code == '' else entry.user_code.lower()
@ -59,16 +63,23 @@ def _start():
@quiz.route('/quiz/')
def _quiz():
id = session.get('id')
try:
if not id or not Entry.query.filter_by(id=id).first():
flash('Your session was not recognised. Please sign in to the quiz again.', 'error')
session.pop('id', None)
return redirect(url_for('quiz._start'))
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/quiz/client.html')
@quiz.route('/result/')
def _result():
id = session.get('id')
entry = Entry.query.filter_by(id=id).first()
try: entry = Entry.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not entry: return abort(404)
session.pop('id',None)
score = round(100*entry.result['score']/entry.result['max'])

View File

@ -1,8 +1,8 @@
from .data import load
from ..models import User
from ..tools.logs import write
from flask import abort, redirect
from flask.helpers import flash, url_for
from flask.helpers import abort, flash, redirect, url_for
from flask_login import current_user
from functools import wraps
@ -10,9 +10,13 @@ from functools import wraps
def require_account_creation(function):
@wraps(function)
def wrapper(*args, **kwargs):
try:
if User.query.count() == 0:
flash('Please register a user account.', 'alert')
return redirect(url_for('admin._register'))
except Exception as exception:
write('system.log', f'Database error when checking for existing accounts: {exception}')
return abort(500)
return function(*args, **kwargs)
return wrapper

View File

@ -1,8 +1,8 @@
from ..models import Dataset
from ..tools.logs import write
from flask import current_app as app
from flask import flash, redirect
from flask.helpers import url_for
from flask.helpers import abort, flash, redirect, url_for
import json
from pathlib import Path
@ -76,7 +76,10 @@ def get_tag_list(dataset:list):
def check_dataset_exists(function):
@wraps(function)
def wrapper(*args, **kwargs):
datasets = Dataset.query.all()
try: datasets = Dataset.query.all()
except Exception as exception:
write('system.log', f'Database error when checking existing datasets: {exception}')
return abort(500)
if not datasets:
flash('There are no available question datasets. Please upload a question dataset first, or use the question editor to create a new dataset.', 'error')
return redirect(url_for('admin._questions'))

View File

@ -1,30 +1,10 @@
from ..extensions import db
from ..tools.logs import write
from flask import jsonify
from wtforms.validators import ValidationError
import json
from sqlalchemy.ext import mutable
class JsonEncodedDict(db.TypeDecorator):
"""Enables JSON storage by encoding and decoding on the fly."""
impl = db.Text
def process_bind_param(self, value, dialect):
if value is None:
return '{}'
else:
return json.dumps(value)
def process_result_value(self, value, dialect):
if value is None:
return {}
else:
return json.loads(value)
mutable.MutableDict.associate_with(JsonEncodedDict)
def value(min:int=0, max:int=None):
if not max:
message = f'Value must be greater than {min}.'
@ -47,7 +27,10 @@ def get_time_options():
def get_dataset_choices():
from ..models import Dataset
datasets = Dataset.query.all()
try: datasets = Dataset.query.all()
except Exception as exception:
write('system.log', f'Database error when fetching dataset lists: {exception}')
return []
dataset_choices = []
for dataset in datasets:
label = dataset.get_name()

View File

@ -1,8 +1,9 @@
from .data import randomise_list
from ..models import Entry
from ..tools.logs import write
from flask import redirect, request, session
from flask.helpers import url_for
from flask import request, session
from flask.helpers import abort, redirect, url_for
from functools import wraps
@ -129,8 +130,11 @@ def redirect_if_started(function):
@wraps(function)
def wrapper(*args, **kwargs):
id = session.get('id')
if request.method == 'GET' and id and Entry.query.filter_by(id=id).first():
return redirect(url_for('quiz._quiz'))
try:
if request.method == 'GET' and id and Entry.query.filter_by(id=id).first(): return redirect(url_for('quiz._quiz'))
except Exception as exception:
write('system.log', f'Database error when checking if test has been started: {exception}')
return abort(500)
return function(*args, **kwargs)
return wrapper

View File

@ -2,9 +2,10 @@ from ..forms.admin import EditDataset
from ..models import Dataset, User
from ..tools.forms import get_dataset_choices, send_errors_to_client
from ..tools.data import check_dataset_exists
from ..tools.logs import write
from flask import Blueprint, flash, jsonify, redirect, render_template, request
from flask.helpers import url_for
from flask import Blueprint, jsonify, render_template, request
from flask.helpers import abort, flash, redirect, url_for
from flask_login import login_required
view = Blueprint(
@ -32,9 +33,13 @@ def _view():
@login_required
@check_dataset_exists
def _view_console(id:str=None):
try:
dataset = Dataset.query.filter_by(id=id).first()
datasets = Dataset.query.count()
users = User.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset:
flash('Invalid dataset ID.', 'error')
return redirect(url_for('admin._questions'))

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python
from main import app
from app.extensions import db
from app.models import *
from app.tools.data import save
from app.tools.logs import write
from sqlalchemy_utils import create_database, database_exists
@ -21,14 +22,14 @@ with app.app_context():
if not path.isfile(f'./{data}/logs/users.log'): write('users.log', 'Log file created.')
if not path.isfile(f'./{data}/logs/system.log'): write('system.log', 'Log file created.')
if not path.isfile(f'./{data}/logs/tests.log'): write('tests.log', 'Log file created.')
if not database_exists(database_uri):
create_database(database_uri)
write('system.log', 'No database found. Creating a new database.')
from app.models import *
db.create_all()
write('system.log', 'Creating database schema.')
if not path.isfile(f'./{data}/.encryption.key'):
write('system.log', 'No encryption key found. Generating new encryption key.')
with open(f'./{data}/.encryption.key', 'wb') as key_file:
key = Fernet.generate_key()
key_file.write(key)
"""Create File for SQLite Database"""
if database_uri[0:6].lower() == 'sqlite':
if not database_exists(database_uri):
create_database(database_uri)
write('system.log', 'No SQLite file found. Creating a new database.')

View File

@ -18,9 +18,11 @@ itsdangerous==2.1.2
Jinja2==3.1.2
MarkupSafe==2.1.1
pycparser==2.21
PyMySQL==1.0.2
python-dotenv==0.20.0
six==1.16.0
SQLAlchemy==1.4.40
sqlalchemy-json==0.5.0
SQLAlchemy-Utils==0.38.3
visitor==0.1.3
Werkzeug==2.2.2

49
ref-test/reset.py Normal file
View File

@ -0,0 +1,49 @@
#!/usr/bin/env python
from main import app
from app.models import User
import sys
from getpass import getpass
with app.app_context():
try: users = User.query.all()
except Exception as exception: sys.exit('Database error:', exception)
print('')
print('This interface will allow you to override the password for an administrator account.')
print('To exit this interface, press Ctrl + C.')
print('')
while True:
username = input('Username: ')
user = None
for _user in users:
if _user.get_username() == username:
user = _user
break
if not user:
print(f'Error: User \'{username}\' does not exist.')
continue
else: break
while True:
email = input('Email address: ')
if not email == user.get_email():
print(f'Error: Incorrect email address for user \'{username}\'.')
continue
else: break
print('')
print('Authenticated using username and email address.')
print('Update the password for the account below.')
print('')
while True:
password = getpass('Enter password: ')
if len(password) < 6 or len(password) > 20:
print(f'Error: Password must be between 6 and 20 characters long.')
reenter_password = getpass('Reenter password: ')
if not password == reenter_password:
print(f'Error: Entered passwords do not match.')
continue
else: break
success, message = user.update(password=password)
if not success:
sys.exit(message)
print('')
print(f'Success: Password for user \'{username}\' has been updated.')