viveksantayana
ecefc1900d
Corrected error display bug Removed redundant auth and models in quiz client app
476 lines
21 KiB
Python
476 lines
21 KiB
Python
from flask import Blueprint, render_template, flash, redirect, request, jsonify, abort, make_response
|
|
from flask.helpers import url_for
|
|
from functools import wraps
|
|
from datetime import datetime
|
|
|
|
import os
|
|
from glob import glob
|
|
from json import loads
|
|
from werkzeug.security import check_password_hash
|
|
from common.security.database import decrypt_find, decrypt_find_one
|
|
from .models.users import User
|
|
from flask_mail import Message
|
|
from main import app, db
|
|
from uuid import uuid4
|
|
import secrets
|
|
from main import mail
|
|
from datetime import datetime, date
|
|
from .models.tests import Test
|
|
from common.data_tools import get_default_dataset, get_time_options, available_datasets
|
|
|
|
views = Blueprint(
|
|
'admin_views',
|
|
__name__,
|
|
template_folder='templates',
|
|
static_folder='static'
|
|
)
|
|
|
|
def admin_account_required(function):
|
|
@wraps(function)
|
|
def decorated_function(*args, **kwargs):
|
|
if not db.users.find_one({}):
|
|
flash('No administrator accounts have been registered. Please register an administrator account.', 'alert')
|
|
return redirect(url_for('admin_auth.register'))
|
|
return function(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def disable_on_registration(function):
|
|
@wraps(function)
|
|
def decorated_function(*args, **kwargs):
|
|
if db.users.find_one({}):
|
|
return abort(404)
|
|
return function(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def get_id_from_cookie():
|
|
return request.cookies.get('_id')
|
|
|
|
def get_user_from_db(_id):
|
|
return db.users.find_one({'_id': _id})
|
|
|
|
def check_login():
|
|
_id = get_id_from_cookie()
|
|
return True if get_user_from_db(_id) else False
|
|
|
|
def login_required(function):
|
|
@wraps(function)
|
|
def decorated_function(*args, **kwargs):
|
|
if not check_login():
|
|
flash('Please log in to view this page.', 'alert')
|
|
return redirect(url_for('admin_auth.login'))
|
|
return function(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def disable_if_logged_in(function):
|
|
@wraps(function)
|
|
def decorated_function(*args, **kwargs):
|
|
if check_login():
|
|
return abort(404)
|
|
return function(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
@views.route('/')
|
|
@views.route('/home/')
|
|
@views.route('/dashboard/')
|
|
@admin_account_required
|
|
@login_required
|
|
def home():
|
|
return render_template('/admin/index.html')
|
|
|
|
@views.route('/settings/')
|
|
@admin_account_required
|
|
@login_required
|
|
def settings():
|
|
return render_template('/admin/settings/index.html')
|
|
|
|
@views.route('/settings/users/', methods=['GET','POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def users():
|
|
from .models.forms import CreateUserForm
|
|
form = CreateUserForm()
|
|
if request.method == 'GET':
|
|
users_list = decrypt_find(db.users, {})
|
|
return render_template('/admin/settings/users.html', users = users_list, form = form)
|
|
if request.method == 'POST':
|
|
if form.validate_on_submit():
|
|
entry = User(
|
|
_id = uuid4().hex,
|
|
username = request.form.get('username').lower(),
|
|
email = request.form.get('email'),
|
|
password = request.form.get('password') if not request.form.get('password') == '' else secrets.token_hex(12),
|
|
)
|
|
email = Message(
|
|
subject = 'RefTest | Registration Confirmation',
|
|
recipients = [entry.email],
|
|
body = f"""
|
|
Hello {entry.username}, \n\n
|
|
You have been registered as an administrator for the SKA RefTest App!\n\n
|
|
You can access your account using the username '{entry.username}'.\n\n
|
|
Your password is as follows:\n\n
|
|
{entry.password}\n\n
|
|
You can change your password by logging in to the admin console by copying the following URL into a web browser:\n\n
|
|
{url_for('admin_views.home', _external = True)}\n\n
|
|
Have a nice day.
|
|
""",
|
|
html = f"""
|
|
<p>Hello {entry.username},</p>
|
|
<p>You have been registered as an administrator for the SKA RefTest App!</p>
|
|
<p>You can access your account using the username '{entry.username}'.</p>
|
|
<p>Your password is as follows:</p>
|
|
<strong>{entry.password}</strong>
|
|
<p>You can change your password by logging in to the admin console at the link below:</p>
|
|
<p><a href='{url_for('admin_views.home', _external = True)}'>{url_for('admin_views.home', _external = True)}</a></p>
|
|
<p>Have a nice day.</p>
|
|
"""
|
|
)
|
|
mail.send(email)
|
|
return entry.register()
|
|
else:
|
|
errors = [*form.username.errors, *form.email.errors, *form.password.errors]
|
|
return jsonify({ 'error': errors}), 400
|
|
|
|
@views.route('/settings/users/delete/<string:_id>', methods = ['GET', 'POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def delete_user(_id:str):
|
|
if _id == get_id_from_cookie():
|
|
flash('Cannot delete your own user account.', 'error')
|
|
return redirect(url_for('admin_views.users'))
|
|
from .models.forms import DeleteUserForm
|
|
form = DeleteUserForm()
|
|
user = decrypt_find_one(db.users, {'_id': _id})
|
|
if request.method == 'GET':
|
|
if not user:
|
|
return abort(404)
|
|
return render_template('/admin/settings/delete-user.html', form = form, _id = _id, user = user)
|
|
if request.method == 'POST':
|
|
if not user:
|
|
return jsonify({ 'error': 'User does not exist.' }), 404
|
|
if form.validate_on_submit():
|
|
_user = decrypt_find_one(db.users, {'_id': get_id_from_cookie()})
|
|
password = request.form.get('password')
|
|
if not check_password_hash(_user['password'], password):
|
|
return jsonify({ 'error': 'The password you entered is incorrect.' }), 401
|
|
if request.form.get('notify'):
|
|
email = Message(
|
|
subject = 'RefTest | Account Deletion',
|
|
recipients = [user['email']],
|
|
bcc = [_user['email']],
|
|
body = f"""
|
|
Hello {user['username']}, \n\n
|
|
Your administrator account for the SKA RefTest App has been deleted by {_user['username']}. All data about your account has been deleted.\n\n
|
|
If you believe this was done in error, please contact them immediately.\n\n
|
|
If you would like to register to administer the app, please ask an existing administrator to create a new account.\n\n
|
|
Have a nice day.
|
|
""",
|
|
html = f"""
|
|
<p>Hello {user['username']},</p>
|
|
<p>Your administrator account for the SKA RefTest App has been deleted by {_user['username']}. All data about your account has been deleted.</p>
|
|
<p>If you believe this was done in error, please contact them immediately.</p>
|
|
<p>If you would like to register to administer the app, please ask an existing administrator to create a new account.</p>
|
|
<p>Have a nice day.</p>
|
|
"""
|
|
)
|
|
mail.send(email)
|
|
user = User(
|
|
_id = user['_id']
|
|
)
|
|
return user.delete()
|
|
else: return abort(400)
|
|
|
|
@views.route('/settings/users/update/<string:_id>', methods = ['GET', 'POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def update_user(_id:str):
|
|
if _id == get_id_from_cookie():
|
|
flash('Cannot delete your own user account.', 'error')
|
|
return redirect(url_for('admin_views.users'))
|
|
from .models.forms import UpdateUserForm
|
|
form = UpdateUserForm()
|
|
user = decrypt_find_one( db.users, {'_id': _id})
|
|
if request.method == 'GET':
|
|
if not user:
|
|
return abort(404)
|
|
return render_template('/admin/settings/update-user.html', form = form, _id = _id, user = user)
|
|
if request.method == 'POST':
|
|
if not user:
|
|
return jsonify({ 'error': 'User does not exist.' }), 404
|
|
if form.validate_on_submit():
|
|
_user = decrypt_find_one(db.users, {'_id': get_id_from_cookie()})
|
|
password = request.form.get('password')
|
|
if not check_password_hash(_user['password'], password):
|
|
return jsonify({ 'error': 'The password you entered is incorrect.' }), 401
|
|
if request.form.get('notify'):
|
|
recipient = request.form.get('email') if not request.form.get('email') == '' else user['email']
|
|
email = Message(
|
|
subject = 'RefTest | Account Update',
|
|
recipients = [recipient],
|
|
bcc = [_user['email']],
|
|
body = f"""
|
|
Hello {user['username']}, \n\n
|
|
Your administrator account for the SKA RefTest App has been updated by {_user['username']}.\n\n
|
|
Your new account details are as follows:\n\n
|
|
Email: {recipient}\n
|
|
Password: {request.form.get('password')}\n\n
|
|
You can update your email and password by logging in to the app.\n\n
|
|
Have a nice day.
|
|
""",
|
|
html = f"""
|
|
<p>Hello {user['username']},</p>
|
|
<p>Your administrator account for the SKA RefTest App has been updated by {_user['username']}.</p>
|
|
<p>Your new account details are as follows:</p>
|
|
<p>Email: {recipient} <br/>Password: {request.form.get('password')}</p>
|
|
<p>You can update your email and password by logging in to the app.</p>
|
|
<p>Have a nice day.</p>
|
|
"""
|
|
)
|
|
mail.send(email)
|
|
entry = User(
|
|
_id = _id,
|
|
email = request.form.get('email'),
|
|
password = request.form.get('password')
|
|
)
|
|
return entry.update()
|
|
else:
|
|
errors = [*form.user_password.errors, *form.email.errors, *form.password.errors, *form.password_reenter.errors]
|
|
return jsonify({ 'error': errors}), 400
|
|
|
|
@views.route('/settings/questions/', methods=['GET', 'POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def questions():
|
|
from .models.forms import UploadDataForm
|
|
from common.data_tools import check_json_format, validate_json_contents, store_data_file
|
|
form = UploadDataForm()
|
|
if request.method == 'GET':
|
|
files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json'))
|
|
data = []
|
|
if files:
|
|
for file in files:
|
|
filename = file.rsplit('/')[-1]
|
|
with open(file) as _file:
|
|
load = loads(_file.read())
|
|
_author = load['meta']['author']
|
|
author = decrypt_find_one(db.users, {'_id': _author})['username']
|
|
data_element = {
|
|
'filename': filename,
|
|
'timestamp': datetime.strptime(load['meta']['timestamp'], '%Y-%m-%d %H%M%S'),
|
|
'author': author,
|
|
'use': len(load['meta']['tests'])
|
|
}
|
|
data.append(data_element)
|
|
default = get_default_dataset()
|
|
return render_template('/admin/settings/questions.html', form=form, data=data, default=default)
|
|
if request.method == 'POST':
|
|
if form.validate_on_submit():
|
|
upload = form.data_file.data
|
|
default = True if request.form.get('default') else False
|
|
if not check_json_format(upload):
|
|
return jsonify({ 'error': 'Invalid file selected. Please upload a JSON file.'}), 400
|
|
if not validate_json_contents(upload):
|
|
return jsonify({'error': 'The data in the file is invalid.'}), 400
|
|
filename = store_data_file(upload, default=default)
|
|
flash(f'Dataset {form.data_file.data.filename} has been uploaded as {filename}.', 'success')
|
|
return jsonify({ 'success': f'Dataset {form.data_file.data.filename} has been uploaded as {filename}.'}), 200
|
|
errors = [*form.errors]
|
|
return jsonify({ 'error': errors}), 400
|
|
|
|
@views.route('/settings/questions/delete/', methods=['POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def delete_questions():
|
|
filename = request.get_json()['filename']
|
|
data_files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json'))
|
|
if any(filename in file for file in data_files):
|
|
default = get_default_dataset()
|
|
if default == filename:
|
|
return jsonify({'error': 'Cannot delete the default question dataset.'}), 400
|
|
data_file = os.path.join(app.config["DATA_FILE_DIRECTORY"],filename)
|
|
with open(data_file, 'r') as _data_file:
|
|
data = loads(_data_file.read())
|
|
if data['meta']['tests']:
|
|
return jsonify({'error': 'Cannot delete a dataset that is in use by an exam.'}), 400
|
|
if len(data_files) == 1:
|
|
return jsonify({'error': 'Cannot delete the only question dataset.'}), 400
|
|
os.remove(data_file)
|
|
flash(f'Question dataset {filename} has been deleted.', 'success')
|
|
return jsonify({'success': f'Question dataset {filename} has been deleted.'}), 200
|
|
return abort(404)
|
|
|
|
@views.route('/settings/questions/default/', methods=['POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def make_default_questions():
|
|
filename = request.get_json()['filename']
|
|
data_files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json'))
|
|
default_file_path = os.path.join(app.config['DATA_FILE_DIRECTORY'], '.default.txt')
|
|
if any(filename in file for file in data_files):
|
|
with open(default_file_path, 'r') as default_file:
|
|
default = default_file.read()
|
|
if default == filename:
|
|
return jsonify({'error': 'Cannot delete default question dataset.'}), 400
|
|
with open(default_file_path, 'w') as default_file:
|
|
default_file.write(filename)
|
|
flash(f'Set dataset f{filename} as the default.', 'success')
|
|
return jsonify({'success': f'Set dataset {filename} as the default.'})
|
|
return abort(404)
|
|
|
|
@views.route('/tests/<filter>/', methods=['GET'])
|
|
@views.route('/tests/', methods=['GET'])
|
|
@admin_account_required
|
|
@login_required
|
|
def tests(filter=''):
|
|
if not available_datasets():
|
|
flash('There are no available question datasets. Please upload a question dataset in order to set up an exam.', 'error')
|
|
return redirect(url_for('admin_views.questions'))
|
|
if filter not in ['', 'create', 'active', 'scheduled', 'expired', 'all']:
|
|
return abort(404)
|
|
if filter == 'create':
|
|
from .models.forms import CreateTest
|
|
form = CreateTest()
|
|
form.time_limit.choices = get_time_options()
|
|
form.dataset.choices = available_datasets()
|
|
form.time_limit.default='none'
|
|
form.dataset.default=get_default_dataset()
|
|
form.process()
|
|
display_title = ''
|
|
error_none = ''
|
|
return render_template('/admin/tests.html', form = form, display_title=display_title, error_none=error_none, filter=filter)
|
|
_tests = db.tests.find({})
|
|
if filter == 'active' or filter == '':
|
|
tests = [ test for test in _tests if test['expiry_date'].date() >= date.today() and test['start_date'].date() <= date.today() ]
|
|
display_title = 'Active Exams'
|
|
error_none = 'There are no exams that are currently active. You can create one using the Creat Exam form.'
|
|
if filter == 'expired':
|
|
tests = [ test for test in _tests if test['expiry_date'].date() < date.today()]
|
|
display_title = 'Expired Exams'
|
|
error_none = 'There are no expired exams. Exams will appear in this category after their expiration date has passed.'
|
|
if filter == 'scheduled':
|
|
tests = [ test for test in _tests if test['start_date'].date() > date.today()]
|
|
display_title = 'Scheduled Exams'
|
|
error_none = 'There are no scheduled exams pending. You can schedule an exam for the future using the Create Exam form.'
|
|
if filter == 'all':
|
|
tests = _tests
|
|
display_title = 'All Exams'
|
|
error_none = 'There are no exams set up. You can create one using the Create Exam form.'
|
|
return render_template('/admin/tests.html', tests = tests, display_title=display_title, error_none=error_none, filter=filter)
|
|
|
|
@views.route('/tests/create/', methods=['POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def create_test():
|
|
from .models.forms import CreateTest
|
|
form = CreateTest()
|
|
form.dataset.choices = available_datasets()
|
|
form.time_limit.choices = get_time_options()
|
|
if form.validate_on_submit():
|
|
start_date = request.form.get('start_date')
|
|
start_date = datetime.strptime(start_date, '%Y-%m-%d')
|
|
expiry_date = request.form.get('expiry_date')
|
|
expiry_date = datetime.strptime(expiry_date, '%Y-%m-%d')
|
|
dataset = request.form.get('dataset')
|
|
errors = []
|
|
if start_date.date() < date.today():
|
|
errors.append('The start date cannot be in the past.')
|
|
if expiry_date.date() < date.today():
|
|
errors.append('The expiry date cannot be in the past.')
|
|
if expiry_date < start_date:
|
|
errors.append('The expiry date cannot be before the start date.')
|
|
if errors:
|
|
return jsonify({'error': errors}), 400
|
|
creator_id = get_id_from_cookie()
|
|
creator = decrypt_find_one(db.users, { '_id': creator_id } )['username']
|
|
test = Test(
|
|
_id = uuid4().hex,
|
|
start_date = start_date,
|
|
expiry_date = expiry_date,
|
|
time_limit = request.form.get('time_limit'),
|
|
creator = creator,
|
|
dataset = dataset
|
|
)
|
|
test.create()
|
|
return jsonify({'success': 'New exam created.'}), 200
|
|
else:
|
|
errors = [*form.expiry.errors, *form.time_limit.errors]
|
|
return jsonify({ 'error': errors}), 400
|
|
|
|
@views.route('/tests/delete/', methods=['POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def delete_test():
|
|
_id = request.get_json()['_id']
|
|
if db.tests.find_one({'_id': _id}):
|
|
return Test(_id = _id).delete()
|
|
return jsonify({'error': 'Could not find the corresponding test to delete.'}), 404
|
|
|
|
@views.route('/test/<_id>/', methods=['GET','POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def view_test(_id):
|
|
from .models.forms import AddTimeAdjustment
|
|
form = AddTimeAdjustment()
|
|
test = decrypt_find_one(db.tests, {'_id': _id})
|
|
if request.method == 'GET':
|
|
if not test:
|
|
return abort(404)
|
|
return render_template('/admin/test.html', test = test, form = form)
|
|
if request.method == 'POST':
|
|
if form.validate_on_submit():
|
|
time = int(request.form.get('time'))
|
|
return Test(_id=_id).add_time_adjustment(time)
|
|
return jsonify({'error': form.time.errors }), 400
|
|
|
|
@views.route('/test/<_id>/delete-adjustment/', methods = ['POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def delete_adjustment(_id):
|
|
user_code = request.get_json()['user_code']
|
|
return Test(_id=_id).remove_time_adjustment(user_code)
|
|
|
|
@views.route('/results/')
|
|
@admin_account_required
|
|
@login_required
|
|
def view_entries():
|
|
entries = decrypt_find(db.entries, {})
|
|
return render_template('/admin/results.html', entries = entries)
|
|
|
|
@views.route('/results/<_id>/', methods = ['GET', 'POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def view_entry(_id=''):
|
|
entry = decrypt_find_one(db.entries, {'_id': _id})
|
|
if request.method == 'GET':
|
|
if not entry:
|
|
return abort(404)
|
|
return render_template('/admin/result-detail.html', entry = entry)
|
|
if request.method == 'POST':
|
|
if not entry:
|
|
return jsonify({'error': 'A valid entry could no be found.'}), 404
|
|
action = request.get_json()['action']
|
|
if action == 'override':
|
|
late_ignore = db.entries.find_one_and_update({'_id': _id}, {'$set': {'status': 'late (allowed)'}})
|
|
if late_ignore:
|
|
flash('Late status for the entry has been allowed.', 'success')
|
|
return jsonify({'success': 'Late status allowed.'}), 200
|
|
return jsonify({'error': 'An error occurred.'}), 400
|
|
if action == 'delete':
|
|
test_code = entry['test_code']
|
|
test = db.tests.find_one_and_update({'test_code': test_code}, {'$pull': {'entries': _id}})
|
|
if not test:
|
|
return jsonify({'error': 'A valid exam could not be found.'}), 404
|
|
delete = db.entries.delete_one({'_id': _id})
|
|
if delete:
|
|
flash('Entry has been deleted.', 'success')
|
|
return jsonify({'success': 'Entry has been deleted.'}), 200
|
|
return jsonify({'error': 'An error occurred.'}), 400
|
|
|
|
@views.route('/certificate/', methods=['POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def generate_certificate():
|
|
_id = request.get_json()['_id']
|
|
entry = decrypt_find_one(db.entries, {'_id': _id})
|
|
if not entry:
|
|
return abort(404)
|
|
return render_template('/admin/components/certificate.html', entry = entry) |