viveksantayana
cb0e4ed4e6
Moved most of app definitions out of guard function to use wsgi Updated configuration files and referencing of .env values. Local version needs dotenv or exporting of env variables. Dockerised version works fine without load_dotenv. Ready to test now!
491 lines
22 KiB
Python
491 lines
22 KiB
Python
from flask import Blueprint, render_template, flash, redirect, request, jsonify, abort, session
|
|
from flask.helpers import url_for
|
|
from functools import wraps
|
|
from datetime import datetime
|
|
|
|
import os
|
|
from glob import glob
|
|
from json import loads
|
|
from werkzeug.security import check_password_hash
|
|
from common.security.database import decrypt_find, decrypt_find_one
|
|
from .models.users import User
|
|
from flask_mail import Message
|
|
from uuid import uuid4
|
|
import secrets
|
|
from datetime import datetime, date
|
|
from .models.tests import Test
|
|
from common.data_tools import get_default_dataset, get_time_options, available_datasets, get_datasets
|
|
|
|
views = Blueprint(
|
|
'admin_views',
|
|
__name__,
|
|
template_folder='templates',
|
|
static_folder='static'
|
|
)
|
|
|
|
def admin_account_required(function):
|
|
@wraps(function)
|
|
def decorated_function(*args, **kwargs):
|
|
from main import db
|
|
from main import db
|
|
if not db.users.find_one({}):
|
|
flash('No administrator accounts have been registered. Please register an administrator account.', 'alert')
|
|
return redirect(url_for('admin_auth.register'))
|
|
return function(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def disable_on_registration(function):
|
|
@wraps(function)
|
|
def decorated_function(*args, **kwargs):
|
|
from main import db
|
|
if db.users.find_one({}):
|
|
return abort(404)
|
|
return function(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def get_id_from_cookie():
|
|
return request.cookies.get('_id')
|
|
|
|
def get_user_from_db(_id):
|
|
from main import db
|
|
return db.users.find_one({'_id': _id})
|
|
|
|
def check_login():
|
|
_id = get_id_from_cookie()
|
|
return True if get_user_from_db(_id) else False
|
|
|
|
def login_required(function):
|
|
@wraps(function)
|
|
def decorated_function(*args, **kwargs):
|
|
if not check_login():
|
|
session['prev_page'] = request.url
|
|
flash('Please log in to view this page.', 'alert')
|
|
return redirect(url_for('admin_auth.login'))
|
|
return function(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def disable_if_logged_in(function):
|
|
@wraps(function)
|
|
def decorated_function(*args, **kwargs):
|
|
if check_login():
|
|
return abort(404)
|
|
return function(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
@views.route('/')
|
|
@views.route('/home/')
|
|
@views.route('/dashboard/')
|
|
@admin_account_required
|
|
@login_required
|
|
def home():
|
|
from main import db
|
|
tests = db.tests.find()
|
|
results = decrypt_find(db.entries, {})
|
|
current_tests = [ test for test in tests if test['expiry_date'].date() >= date.today() and test['start_date'].date() <= date.today() ]
|
|
current_tests.sort(key= lambda x: x['expiry_date'], reverse=True)
|
|
upcoming_tests = [ test for test in tests if test['start_date'].date() > date.today()]
|
|
upcoming_tests.sort(key= lambda x: x['start_date'])
|
|
recent_results = [result for result in results if 'submission_time' in result ]
|
|
recent_results.sort(key= lambda x: x['submission_time'], reverse=True)
|
|
for result in recent_results:
|
|
result['percent'] = round(100*result['results']['score']/result['results']['max'])
|
|
return render_template('/admin/index.html', current_tests = current_tests[:5], upcomimg_tests = upcoming_tests[:5], recent_results = recent_results[:5])
|
|
|
|
@views.route('/settings/')
|
|
@admin_account_required
|
|
@login_required
|
|
def settings():
|
|
from main import db
|
|
users = decrypt_find(db.users, {})
|
|
users.sort(key= lambda x: x['username'])
|
|
datasets = get_datasets()
|
|
return render_template('/admin/settings/index.html', users=users[:5], datasets=datasets[:5])
|
|
|
|
@views.route('/settings/users/', methods=['GET','POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def users():
|
|
from main import db, mail
|
|
from .models.forms import CreateUserForm
|
|
form = CreateUserForm()
|
|
if request.method == 'GET':
|
|
users_list = decrypt_find(db.users, {})
|
|
return render_template('/admin/settings/users.html', users = users_list, form = form)
|
|
if request.method == 'POST':
|
|
if form.validate_on_submit():
|
|
entry = User(
|
|
_id = uuid4().hex,
|
|
username = request.form.get('username').lower(),
|
|
email = request.form.get('email'),
|
|
password = request.form.get('password') if not request.form.get('password') == '' else secrets.token_hex(12),
|
|
)
|
|
email = Message(
|
|
subject = 'RefTest | Registration Confirmation',
|
|
recipients = [entry.email],
|
|
body = f"""
|
|
Hello {entry.username}, \n\n
|
|
You have been registered as an administrator for the SKA RefTest App!\n\n
|
|
You can access your account using the username '{entry.username}'.\n\n
|
|
Your password is as follows:\n\n
|
|
{entry.password}\n\n
|
|
You can change your password by logging in to the admin console by copying the following URL into a web browser:\n\n
|
|
{url_for('admin_views.home', _external = True)}\n\n
|
|
Have a nice day.
|
|
""",
|
|
html = f"""
|
|
<p>Hello {entry.username},</p>
|
|
<p>You have been registered as an administrator for the SKA RefTest App!</p>
|
|
<p>You can access your account using the username '{entry.username}'.</p>
|
|
<p>Your password is as follows:</p>
|
|
<strong>{entry.password}</strong>
|
|
<p>You can change your password by logging in to the admin console at the link below:</p>
|
|
<p><a href='{url_for('admin_views.home', _external = True)}'>{url_for('admin_views.home', _external = True)}</a></p>
|
|
<p>Have a nice day.</p>
|
|
"""
|
|
)
|
|
mail.send(email)
|
|
return entry.register()
|
|
else:
|
|
errors = [*form.username.errors, *form.email.errors, *form.password.errors]
|
|
return jsonify({ 'error': errors}), 400
|
|
|
|
@views.route('/settings/users/delete/<string:_id>', methods = ['GET', 'POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def delete_user(_id:str):
|
|
from main import db, mail
|
|
if _id == get_id_from_cookie():
|
|
flash('Cannot delete your own user account.', 'error')
|
|
return redirect(url_for('admin_views.users'))
|
|
from .models.forms import DeleteUserForm
|
|
form = DeleteUserForm()
|
|
user = decrypt_find_one(db.users, {'_id': _id})
|
|
if request.method == 'GET':
|
|
if not user:
|
|
return abort(404)
|
|
return render_template('/admin/settings/delete-user.html', form = form, _id = _id, user = user)
|
|
if request.method == 'POST':
|
|
if not user:
|
|
return jsonify({ 'error': 'User does not exist.' }), 404
|
|
if form.validate_on_submit():
|
|
_user = decrypt_find_one(db.users, {'_id': get_id_from_cookie()})
|
|
password = request.form.get('password')
|
|
if not check_password_hash(_user['password'], password):
|
|
return jsonify({ 'error': 'The password you entered is incorrect.' }), 401
|
|
if request.form.get('notify'):
|
|
email = Message(
|
|
subject = 'RefTest | Account Deletion',
|
|
recipients = [user['email']],
|
|
bcc = [_user['email']],
|
|
body = f"""
|
|
Hello {user['username']}, \n\n
|
|
Your administrator account for the SKA RefTest App has been deleted by {_user['username']}. All data about your account has been deleted.\n\n
|
|
If you believe this was done in error, please contact them immediately.\n\n
|
|
If you would like to register to administer the app, please ask an existing administrator to create a new account.\n\n
|
|
Have a nice day.
|
|
""",
|
|
html = f"""
|
|
<p>Hello {user['username']},</p>
|
|
<p>Your administrator account for the SKA RefTest App has been deleted by {_user['username']}. All data about your account has been deleted.</p>
|
|
<p>If you believe this was done in error, please contact them immediately.</p>
|
|
<p>If you would like to register to administer the app, please ask an existing administrator to create a new account.</p>
|
|
<p>Have a nice day.</p>
|
|
"""
|
|
)
|
|
mail.send(email)
|
|
user = User(
|
|
_id = user['_id']
|
|
)
|
|
return user.delete()
|
|
else: return abort(400)
|
|
|
|
@views.route('/settings/users/update/<string:_id>', methods = ['GET', 'POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def update_user(_id:str):
|
|
from main import db, mail
|
|
if _id == get_id_from_cookie():
|
|
flash('Cannot delete your own user account.', 'error')
|
|
return redirect(url_for('admin_views.users'))
|
|
from .models.forms import UpdateUserForm
|
|
form = UpdateUserForm()
|
|
user = decrypt_find_one( db.users, {'_id': _id})
|
|
if request.method == 'GET':
|
|
if not user:
|
|
return abort(404)
|
|
return render_template('/admin/settings/update-user.html', form = form, _id = _id, user = user)
|
|
if request.method == 'POST':
|
|
if not user:
|
|
return jsonify({ 'error': 'User does not exist.' }), 404
|
|
if form.validate_on_submit():
|
|
_user = decrypt_find_one(db.users, {'_id': get_id_from_cookie()})
|
|
password = request.form.get('password')
|
|
if not check_password_hash(_user['password'], password):
|
|
return jsonify({ 'error': 'The password you entered is incorrect.' }), 401
|
|
if request.form.get('notify'):
|
|
recipient = request.form.get('email') if not request.form.get('email') == '' else user['email']
|
|
email = Message(
|
|
subject = 'RefTest | Account Update',
|
|
recipients = [recipient],
|
|
bcc = [_user['email']],
|
|
body = f"""
|
|
Hello {user['username']}, \n\n
|
|
Your administrator account for the SKA RefTest App has been updated by {_user['username']}.\n\n
|
|
Your new account details are as follows:\n\n
|
|
Email: {recipient}\n
|
|
Password: {request.form.get('password')}\n\n
|
|
You can update your email and password by logging in to the app.\n\n
|
|
Have a nice day.
|
|
""",
|
|
html = f"""
|
|
<p>Hello {user['username']},</p>
|
|
<p>Your administrator account for the SKA RefTest App has been updated by {_user['username']}.</p>
|
|
<p>Your new account details are as follows:</p>
|
|
<p>Email: {recipient} <br/>Password: {request.form.get('password')}</p>
|
|
<p>You can update your email and password by logging in to the app.</p>
|
|
<p>Have a nice day.</p>
|
|
"""
|
|
)
|
|
mail.send(email)
|
|
entry = User(
|
|
_id = _id,
|
|
email = request.form.get('email'),
|
|
password = request.form.get('password')
|
|
)
|
|
return entry.update()
|
|
else:
|
|
errors = [*form.user_password.errors, *form.email.errors, *form.password.errors, *form.password_reenter.errors]
|
|
return jsonify({ 'error': errors}), 400
|
|
|
|
@views.route('/settings/questions/', methods=['GET', 'POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def questions():
|
|
from .models.forms import UploadDataForm
|
|
from common.data_tools import check_json_format, validate_json_contents, store_data_file
|
|
form = UploadDataForm()
|
|
if request.method == 'GET':
|
|
data = get_datasets()
|
|
default = get_default_dataset()
|
|
return render_template('/admin/settings/questions.html', form=form, data=data, default=default)
|
|
if request.method == 'POST':
|
|
if form.validate_on_submit():
|
|
upload = form.data_file.data
|
|
default = True if request.form.get('default') else False
|
|
if not check_json_format(upload):
|
|
return jsonify({ 'error': 'Invalid file selected. Please upload a JSON file.'}), 400
|
|
if not validate_json_contents(upload):
|
|
return jsonify({'error': 'The data in the file is invalid.'}), 400
|
|
filename = store_data_file(upload, default=default)
|
|
flash(f'Dataset {form.data_file.data.filename} has been uploaded as {filename}.', 'success')
|
|
return jsonify({ 'success': f'Dataset {form.data_file.data.filename} has been uploaded as {filename}.'}), 200
|
|
errors = [*form.errors]
|
|
return jsonify({ 'error': errors}), 400
|
|
|
|
@views.route('/settings/questions/delete/', methods=['POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def delete_questions():
|
|
from main import db, app
|
|
filename = request.get_json()['filename']
|
|
data_files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json'))
|
|
if any(filename in file for file in data_files):
|
|
default = get_default_dataset()
|
|
if default == filename:
|
|
return jsonify({'error': 'Cannot delete the default question dataset.'}), 400
|
|
data_file = os.path.join(app.config["DATA_FILE_DIRECTORY"],filename)
|
|
with open(data_file, 'r') as _data_file:
|
|
data = loads(_data_file.read())
|
|
if data['meta']['tests']:
|
|
return jsonify({'error': 'Cannot delete a dataset that is in use by an exam.'}), 400
|
|
if len(data_files) == 1:
|
|
return jsonify({'error': 'Cannot delete the only question dataset.'}), 400
|
|
os.remove(data_file)
|
|
flash(f'Question dataset {filename} has been deleted.', 'success')
|
|
return jsonify({'success': f'Question dataset {filename} has been deleted.'}), 200
|
|
return abort(404)
|
|
|
|
@views.route('/settings/questions/default/', methods=['POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def make_default_questions():
|
|
from main import app
|
|
filename = request.get_json()['filename']
|
|
data_files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json'))
|
|
default_file_path = os.path.join(app.config['DATA_FILE_DIRECTORY'], '.default.txt')
|
|
if any(filename in file for file in data_files):
|
|
with open(default_file_path, 'r') as default_file:
|
|
default = default_file.read()
|
|
if default == filename:
|
|
return jsonify({'error': 'Cannot delete default question dataset.'}), 400
|
|
with open(default_file_path, 'w') as default_file:
|
|
default_file.write(filename)
|
|
flash(f'Set dataset f{filename} as the default.', 'success')
|
|
return jsonify({'success': f'Set dataset {filename} as the default.'})
|
|
return abort(404)
|
|
|
|
@views.route('/tests/<filter>/', methods=['GET'])
|
|
@views.route('/tests/', methods=['GET'])
|
|
@admin_account_required
|
|
@login_required
|
|
def tests(filter=''):
|
|
from main import db
|
|
if not available_datasets():
|
|
flash('There are no available question datasets. Please upload a question dataset in order to set up an exam.', 'error')
|
|
return redirect(url_for('admin_views.questions'))
|
|
if filter not in ['', 'create', 'active', 'scheduled', 'expired', 'all']:
|
|
return abort(404)
|
|
if filter == 'create':
|
|
from .models.forms import CreateTest
|
|
form = CreateTest()
|
|
form.time_limit.choices = get_time_options()
|
|
form.dataset.choices = available_datasets()
|
|
form.time_limit.default='none'
|
|
form.dataset.default=get_default_dataset()
|
|
form.process()
|
|
display_title = ''
|
|
error_none = ''
|
|
return render_template('/admin/tests.html', form = form, display_title=display_title, error_none=error_none, filter=filter)
|
|
_tests = db.tests.find({})
|
|
if filter == 'active' or filter == '':
|
|
tests = [ test for test in _tests if test['expiry_date'].date() >= date.today() and test['start_date'].date() <= date.today() ]
|
|
display_title = 'Active Exams'
|
|
error_none = 'There are no exams that are currently active. You can create one using the Creat Exam form.'
|
|
if filter == 'expired':
|
|
tests = [ test for test in _tests if test['expiry_date'].date() < date.today()]
|
|
display_title = 'Expired Exams'
|
|
error_none = 'There are no expired exams. Exams will appear in this category after their expiration date has passed.'
|
|
if filter == 'scheduled':
|
|
tests = [ test for test in _tests if test['start_date'].date() > date.today()]
|
|
display_title = 'Scheduled Exams'
|
|
error_none = 'There are no scheduled exams pending. You can schedule an exam for the future using the Create Exam form.'
|
|
if filter == 'all':
|
|
tests = _tests
|
|
display_title = 'All Exams'
|
|
error_none = 'There are no exams set up. You can create one using the Create Exam form.'
|
|
return render_template('/admin/tests.html', tests = tests, display_title=display_title, error_none=error_none, filter=filter)
|
|
|
|
@views.route('/tests/create/', methods=['POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def create_test():
|
|
from main import db
|
|
from .models.forms import CreateTest
|
|
form = CreateTest()
|
|
form.dataset.choices = available_datasets()
|
|
form.time_limit.choices = get_time_options()
|
|
if form.validate_on_submit():
|
|
start_date = request.form.get('start_date')
|
|
start_date = datetime.strptime(start_date, '%Y-%m-%d')
|
|
expiry_date = request.form.get('expiry_date')
|
|
expiry_date = datetime.strptime(expiry_date, '%Y-%m-%d')
|
|
dataset = request.form.get('dataset')
|
|
errors = []
|
|
if start_date.date() < date.today():
|
|
errors.append('The start date cannot be in the past.')
|
|
if expiry_date.date() < date.today():
|
|
errors.append('The expiry date cannot be in the past.')
|
|
if expiry_date < start_date:
|
|
errors.append('The expiry date cannot be before the start date.')
|
|
if errors:
|
|
return jsonify({'error': errors}), 400
|
|
creator_id = get_id_from_cookie()
|
|
creator = decrypt_find_one(db.users, { '_id': creator_id } )['username']
|
|
test = Test(
|
|
_id = uuid4().hex,
|
|
start_date = start_date,
|
|
expiry_date = expiry_date,
|
|
time_limit = request.form.get('time_limit'),
|
|
creator = creator,
|
|
dataset = dataset
|
|
)
|
|
test.create()
|
|
return jsonify({'success': 'New exam created.'}), 200
|
|
else:
|
|
errors = [*form.expiry.errors, *form.time_limit.errors]
|
|
return jsonify({ 'error': errors}), 400
|
|
|
|
@views.route('/tests/delete/', methods=['POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def delete_test():
|
|
from main import db
|
|
_id = request.get_json()['_id']
|
|
if db.tests.find_one({'_id': _id}):
|
|
return Test(_id = _id).delete()
|
|
return jsonify({'error': 'Could not find the corresponding test to delete.'}), 404
|
|
|
|
@views.route('/test/<_id>/', methods=['GET','POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def view_test(_id):
|
|
from main import db
|
|
from .models.forms import AddTimeAdjustment
|
|
form = AddTimeAdjustment()
|
|
test = decrypt_find_one(db.tests, {'_id': _id})
|
|
if request.method == 'GET':
|
|
if not test:
|
|
return abort(404)
|
|
return render_template('/admin/test.html', test = test, form = form)
|
|
if request.method == 'POST':
|
|
if form.validate_on_submit():
|
|
time = int(request.form.get('time'))
|
|
return Test(_id=_id).add_time_adjustment(time)
|
|
return jsonify({'error': form.time.errors }), 400
|
|
|
|
@views.route('/test/<_id>/delete-adjustment/', methods = ['POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def delete_adjustment(_id):
|
|
user_code = request.get_json()['user_code']
|
|
return Test(_id=_id).remove_time_adjustment(user_code)
|
|
|
|
@views.route('/results/')
|
|
@admin_account_required
|
|
@login_required
|
|
def view_entries():
|
|
from main import db
|
|
entries = decrypt_find(db.entries, {})
|
|
return render_template('/admin/results.html', entries = entries)
|
|
|
|
@views.route('/results/<_id>/', methods = ['GET', 'POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def view_entry(_id=''):
|
|
from main import db
|
|
entry = decrypt_find_one(db.entries, {'_id': _id})
|
|
if request.method == 'GET':
|
|
if not entry:
|
|
return abort(404)
|
|
return render_template('/admin/result-detail.html', entry = entry)
|
|
if request.method == 'POST':
|
|
if not entry:
|
|
return jsonify({'error': 'A valid entry could no be found.'}), 404
|
|
action = request.get_json()['action']
|
|
if action == 'override':
|
|
late_ignore = db.entries.find_one_and_update({'_id': _id}, {'$set': {'status': 'late (allowed)'}})
|
|
if late_ignore:
|
|
flash('Late status for the entry has been allowed.', 'success')
|
|
return jsonify({'success': 'Late status allowed.'}), 200
|
|
return jsonify({'error': 'An error occurred.'}), 400
|
|
if action == 'delete':
|
|
test_code = entry['test_code']
|
|
test = db.tests.find_one_and_update({'test_code': test_code}, {'$pull': {'entries': _id}})
|
|
if not test:
|
|
return jsonify({'error': 'A valid exam could not be found.'}), 404
|
|
delete = db.entries.delete_one({'_id': _id})
|
|
if delete:
|
|
flash('Entry has been deleted.', 'success')
|
|
return jsonify({'success': 'Entry has been deleted.'}), 200
|
|
return jsonify({'error': 'An error occurred.'}), 400
|
|
|
|
@views.route('/certificate/', methods=['POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def generate_certificate():
|
|
from main import db
|
|
_id = request.get_json()['_id']
|
|
entry = decrypt_find_one(db.entries, {'_id': _id})
|
|
if not entry:
|
|
return abort(404)
|
|
return render_template('/admin/components/certificate.html', entry = entry) |