ska-referee-test/ref-test/admin/views.py

477 lines
21 KiB
Python

from flask import Blueprint, render_template, flash, redirect, request, jsonify, abort, session
from flask.helpers import url_for
from functools import wraps
from datetime import datetime
import os
from glob import glob
from json import loads
from werkzeug.security import check_password_hash
from common.security.database import decrypt_find, decrypt_find_one
from .models.users import User
from flask_mail import Message
from main import app, db
from uuid import uuid4
import secrets
from main import mail
from datetime import datetime, date
from .models.tests import Test
from common.data_tools import get_default_dataset, get_time_options, available_datasets
views = Blueprint(
'admin_views',
__name__,
template_folder='templates',
static_folder='static'
)
def admin_account_required(function):
@wraps(function)
def decorated_function(*args, **kwargs):
if not db.users.find_one({}):
flash('No administrator accounts have been registered. Please register an administrator account.', 'alert')
return redirect(url_for('admin_auth.register'))
return function(*args, **kwargs)
return decorated_function
def disable_on_registration(function):
@wraps(function)
def decorated_function(*args, **kwargs):
if db.users.find_one({}):
return abort(404)
return function(*args, **kwargs)
return decorated_function
def get_id_from_cookie():
return request.cookies.get('_id')
def get_user_from_db(_id):
return db.users.find_one({'_id': _id})
def check_login():
_id = get_id_from_cookie()
return True if get_user_from_db(_id) else False
def login_required(function):
@wraps(function)
def decorated_function(*args, **kwargs):
if not check_login():
session['prev_page'] = request.url
flash('Please log in to view this page.', 'alert')
return redirect(url_for('admin_auth.login'))
return function(*args, **kwargs)
return decorated_function
def disable_if_logged_in(function):
@wraps(function)
def decorated_function(*args, **kwargs):
if check_login():
return abort(404)
return function(*args, **kwargs)
return decorated_function
@views.route('/')
@views.route('/home/')
@views.route('/dashboard/')
@admin_account_required
@login_required
def home():
return render_template('/admin/index.html')
@views.route('/settings/')
@admin_account_required
@login_required
def settings():
return render_template('/admin/settings/index.html')
@views.route('/settings/users/', methods=['GET','POST'])
@admin_account_required
@login_required
def users():
from .models.forms import CreateUserForm
form = CreateUserForm()
if request.method == 'GET':
users_list = decrypt_find(db.users, {})
return render_template('/admin/settings/users.html', users = users_list, form = form)
if request.method == 'POST':
if form.validate_on_submit():
entry = User(
_id = uuid4().hex,
username = request.form.get('username').lower(),
email = request.form.get('email'),
password = request.form.get('password') if not request.form.get('password') == '' else secrets.token_hex(12),
)
email = Message(
subject = 'RefTest | Registration Confirmation',
recipients = [entry.email],
body = f"""
Hello {entry.username}, \n\n
You have been registered as an administrator for the SKA RefTest App!\n\n
You can access your account using the username '{entry.username}'.\n\n
Your password is as follows:\n\n
{entry.password}\n\n
You can change your password by logging in to the admin console by copying the following URL into a web browser:\n\n
{url_for('admin_views.home', _external = True)}\n\n
Have a nice day.
""",
html = f"""
<p>Hello {entry.username},</p>
<p>You have been registered as an administrator for the SKA RefTest App!</p>
<p>You can access your account using the username '{entry.username}'.</p>
<p>Your password is as follows:</p>
<strong>{entry.password}</strong>
<p>You can change your password by logging in to the admin console at the link below:</p>
<p><a href='{url_for('admin_views.home', _external = True)}'>{url_for('admin_views.home', _external = True)}</a></p>
<p>Have a nice day.</p>
"""
)
mail.send(email)
return entry.register()
else:
errors = [*form.username.errors, *form.email.errors, *form.password.errors]
return jsonify({ 'error': errors}), 400
@views.route('/settings/users/delete/<string:_id>', methods = ['GET', 'POST'])
@admin_account_required
@login_required
def delete_user(_id:str):
if _id == get_id_from_cookie():
flash('Cannot delete your own user account.', 'error')
return redirect(url_for('admin_views.users'))
from .models.forms import DeleteUserForm
form = DeleteUserForm()
user = decrypt_find_one(db.users, {'_id': _id})
if request.method == 'GET':
if not user:
return abort(404)
return render_template('/admin/settings/delete-user.html', form = form, _id = _id, user = user)
if request.method == 'POST':
if not user:
return jsonify({ 'error': 'User does not exist.' }), 404
if form.validate_on_submit():
_user = decrypt_find_one(db.users, {'_id': get_id_from_cookie()})
password = request.form.get('password')
if not check_password_hash(_user['password'], password):
return jsonify({ 'error': 'The password you entered is incorrect.' }), 401
if request.form.get('notify'):
email = Message(
subject = 'RefTest | Account Deletion',
recipients = [user['email']],
bcc = [_user['email']],
body = f"""
Hello {user['username']}, \n\n
Your administrator account for the SKA RefTest App has been deleted by {_user['username']}. All data about your account has been deleted.\n\n
If you believe this was done in error, please contact them immediately.\n\n
If you would like to register to administer the app, please ask an existing administrator to create a new account.\n\n
Have a nice day.
""",
html = f"""
<p>Hello {user['username']},</p>
<p>Your administrator account for the SKA RefTest App has been deleted by {_user['username']}. All data about your account has been deleted.</p>
<p>If you believe this was done in error, please contact them immediately.</p>
<p>If you would like to register to administer the app, please ask an existing administrator to create a new account.</p>
<p>Have a nice day.</p>
"""
)
mail.send(email)
user = User(
_id = user['_id']
)
return user.delete()
else: return abort(400)
@views.route('/settings/users/update/<string:_id>', methods = ['GET', 'POST'])
@admin_account_required
@login_required
def update_user(_id:str):
if _id == get_id_from_cookie():
flash('Cannot delete your own user account.', 'error')
return redirect(url_for('admin_views.users'))
from .models.forms import UpdateUserForm
form = UpdateUserForm()
user = decrypt_find_one( db.users, {'_id': _id})
if request.method == 'GET':
if not user:
return abort(404)
return render_template('/admin/settings/update-user.html', form = form, _id = _id, user = user)
if request.method == 'POST':
if not user:
return jsonify({ 'error': 'User does not exist.' }), 404
if form.validate_on_submit():
_user = decrypt_find_one(db.users, {'_id': get_id_from_cookie()})
password = request.form.get('password')
if not check_password_hash(_user['password'], password):
return jsonify({ 'error': 'The password you entered is incorrect.' }), 401
if request.form.get('notify'):
recipient = request.form.get('email') if not request.form.get('email') == '' else user['email']
email = Message(
subject = 'RefTest | Account Update',
recipients = [recipient],
bcc = [_user['email']],
body = f"""
Hello {user['username']}, \n\n
Your administrator account for the SKA RefTest App has been updated by {_user['username']}.\n\n
Your new account details are as follows:\n\n
Email: {recipient}\n
Password: {request.form.get('password')}\n\n
You can update your email and password by logging in to the app.\n\n
Have a nice day.
""",
html = f"""
<p>Hello {user['username']},</p>
<p>Your administrator account for the SKA RefTest App has been updated by {_user['username']}.</p>
<p>Your new account details are as follows:</p>
<p>Email: {recipient} <br/>Password: {request.form.get('password')}</p>
<p>You can update your email and password by logging in to the app.</p>
<p>Have a nice day.</p>
"""
)
mail.send(email)
entry = User(
_id = _id,
email = request.form.get('email'),
password = request.form.get('password')
)
return entry.update()
else:
errors = [*form.user_password.errors, *form.email.errors, *form.password.errors, *form.password_reenter.errors]
return jsonify({ 'error': errors}), 400
@views.route('/settings/questions/', methods=['GET', 'POST'])
@admin_account_required
@login_required
def questions():
from .models.forms import UploadDataForm
from common.data_tools import check_json_format, validate_json_contents, store_data_file
form = UploadDataForm()
if request.method == 'GET':
files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json'))
data = []
if files:
for file in files:
filename = file.rsplit('/')[-1]
with open(file) as _file:
load = loads(_file.read())
_author = load['meta']['author']
author = decrypt_find_one(db.users, {'_id': _author})['username']
data_element = {
'filename': filename,
'timestamp': datetime.strptime(load['meta']['timestamp'], '%Y-%m-%d %H%M%S'),
'author': author,
'use': len(load['meta']['tests'])
}
data.append(data_element)
default = get_default_dataset()
return render_template('/admin/settings/questions.html', form=form, data=data, default=default)
if request.method == 'POST':
if form.validate_on_submit():
upload = form.data_file.data
default = True if request.form.get('default') else False
if not check_json_format(upload):
return jsonify({ 'error': 'Invalid file selected. Please upload a JSON file.'}), 400
if not validate_json_contents(upload):
return jsonify({'error': 'The data in the file is invalid.'}), 400
filename = store_data_file(upload, default=default)
flash(f'Dataset {form.data_file.data.filename} has been uploaded as {filename}.', 'success')
return jsonify({ 'success': f'Dataset {form.data_file.data.filename} has been uploaded as {filename}.'}), 200
errors = [*form.errors]
return jsonify({ 'error': errors}), 400
@views.route('/settings/questions/delete/', methods=['POST'])
@admin_account_required
@login_required
def delete_questions():
filename = request.get_json()['filename']
data_files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json'))
if any(filename in file for file in data_files):
default = get_default_dataset()
if default == filename:
return jsonify({'error': 'Cannot delete the default question dataset.'}), 400
data_file = os.path.join(app.config["DATA_FILE_DIRECTORY"],filename)
with open(data_file, 'r') as _data_file:
data = loads(_data_file.read())
if data['meta']['tests']:
return jsonify({'error': 'Cannot delete a dataset that is in use by an exam.'}), 400
if len(data_files) == 1:
return jsonify({'error': 'Cannot delete the only question dataset.'}), 400
os.remove(data_file)
flash(f'Question dataset {filename} has been deleted.', 'success')
return jsonify({'success': f'Question dataset {filename} has been deleted.'}), 200
return abort(404)
@views.route('/settings/questions/default/', methods=['POST'])
@admin_account_required
@login_required
def make_default_questions():
filename = request.get_json()['filename']
data_files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json'))
default_file_path = os.path.join(app.config['DATA_FILE_DIRECTORY'], '.default.txt')
if any(filename in file for file in data_files):
with open(default_file_path, 'r') as default_file:
default = default_file.read()
if default == filename:
return jsonify({'error': 'Cannot delete default question dataset.'}), 400
with open(default_file_path, 'w') as default_file:
default_file.write(filename)
flash(f'Set dataset f{filename} as the default.', 'success')
return jsonify({'success': f'Set dataset {filename} as the default.'})
return abort(404)
@views.route('/tests/<filter>/', methods=['GET'])
@views.route('/tests/', methods=['GET'])
@admin_account_required
@login_required
def tests(filter=''):
if not available_datasets():
flash('There are no available question datasets. Please upload a question dataset in order to set up an exam.', 'error')
return redirect(url_for('admin_views.questions'))
if filter not in ['', 'create', 'active', 'scheduled', 'expired', 'all']:
return abort(404)
if filter == 'create':
from .models.forms import CreateTest
form = CreateTest()
form.time_limit.choices = get_time_options()
form.dataset.choices = available_datasets()
form.time_limit.default='none'
form.dataset.default=get_default_dataset()
form.process()
display_title = ''
error_none = ''
return render_template('/admin/tests.html', form = form, display_title=display_title, error_none=error_none, filter=filter)
_tests = db.tests.find({})
if filter == 'active' or filter == '':
tests = [ test for test in _tests if test['expiry_date'].date() >= date.today() and test['start_date'].date() <= date.today() ]
display_title = 'Active Exams'
error_none = 'There are no exams that are currently active. You can create one using the Creat Exam form.'
if filter == 'expired':
tests = [ test for test in _tests if test['expiry_date'].date() < date.today()]
display_title = 'Expired Exams'
error_none = 'There are no expired exams. Exams will appear in this category after their expiration date has passed.'
if filter == 'scheduled':
tests = [ test for test in _tests if test['start_date'].date() > date.today()]
display_title = 'Scheduled Exams'
error_none = 'There are no scheduled exams pending. You can schedule an exam for the future using the Create Exam form.'
if filter == 'all':
tests = _tests
display_title = 'All Exams'
error_none = 'There are no exams set up. You can create one using the Create Exam form.'
return render_template('/admin/tests.html', tests = tests, display_title=display_title, error_none=error_none, filter=filter)
@views.route('/tests/create/', methods=['POST'])
@admin_account_required
@login_required
def create_test():
from .models.forms import CreateTest
form = CreateTest()
form.dataset.choices = available_datasets()
form.time_limit.choices = get_time_options()
if form.validate_on_submit():
start_date = request.form.get('start_date')
start_date = datetime.strptime(start_date, '%Y-%m-%d')
expiry_date = request.form.get('expiry_date')
expiry_date = datetime.strptime(expiry_date, '%Y-%m-%d')
dataset = request.form.get('dataset')
errors = []
if start_date.date() < date.today():
errors.append('The start date cannot be in the past.')
if expiry_date.date() < date.today():
errors.append('The expiry date cannot be in the past.')
if expiry_date < start_date:
errors.append('The expiry date cannot be before the start date.')
if errors:
return jsonify({'error': errors}), 400
creator_id = get_id_from_cookie()
creator = decrypt_find_one(db.users, { '_id': creator_id } )['username']
test = Test(
_id = uuid4().hex,
start_date = start_date,
expiry_date = expiry_date,
time_limit = request.form.get('time_limit'),
creator = creator,
dataset = dataset
)
test.create()
return jsonify({'success': 'New exam created.'}), 200
else:
errors = [*form.expiry.errors, *form.time_limit.errors]
return jsonify({ 'error': errors}), 400
@views.route('/tests/delete/', methods=['POST'])
@admin_account_required
@login_required
def delete_test():
_id = request.get_json()['_id']
if db.tests.find_one({'_id': _id}):
return Test(_id = _id).delete()
return jsonify({'error': 'Could not find the corresponding test to delete.'}), 404
@views.route('/test/<_id>/', methods=['GET','POST'])
@admin_account_required
@login_required
def view_test(_id):
from .models.forms import AddTimeAdjustment
form = AddTimeAdjustment()
test = decrypt_find_one(db.tests, {'_id': _id})
if request.method == 'GET':
if not test:
return abort(404)
return render_template('/admin/test.html', test = test, form = form)
if request.method == 'POST':
if form.validate_on_submit():
time = int(request.form.get('time'))
return Test(_id=_id).add_time_adjustment(time)
return jsonify({'error': form.time.errors }), 400
@views.route('/test/<_id>/delete-adjustment/', methods = ['POST'])
@admin_account_required
@login_required
def delete_adjustment(_id):
user_code = request.get_json()['user_code']
return Test(_id=_id).remove_time_adjustment(user_code)
@views.route('/results/')
@admin_account_required
@login_required
def view_entries():
entries = decrypt_find(db.entries, {})
return render_template('/admin/results.html', entries = entries)
@views.route('/results/<_id>/', methods = ['GET', 'POST'])
@admin_account_required
@login_required
def view_entry(_id=''):
entry = decrypt_find_one(db.entries, {'_id': _id})
if request.method == 'GET':
if not entry:
return abort(404)
return render_template('/admin/result-detail.html', entry = entry)
if request.method == 'POST':
if not entry:
return jsonify({'error': 'A valid entry could no be found.'}), 404
action = request.get_json()['action']
if action == 'override':
late_ignore = db.entries.find_one_and_update({'_id': _id}, {'$set': {'status': 'late (allowed)'}})
if late_ignore:
flash('Late status for the entry has been allowed.', 'success')
return jsonify({'success': 'Late status allowed.'}), 200
return jsonify({'error': 'An error occurred.'}), 400
if action == 'delete':
test_code = entry['test_code']
test = db.tests.find_one_and_update({'test_code': test_code}, {'$pull': {'entries': _id}})
if not test:
return jsonify({'error': 'A valid exam could not be found.'}), 404
delete = db.entries.delete_one({'_id': _id})
if delete:
flash('Entry has been deleted.', 'success')
return jsonify({'success': 'Entry has been deleted.'}), 200
return jsonify({'error': 'An error occurred.'}), 400
@views.route('/certificate/', methods=['POST'])
@admin_account_required
@login_required
def generate_certificate():
_id = request.get_json()['_id']
entry = decrypt_find_one(db.entries, {'_id': _id})
if not entry:
return abort(404)
return render_template('/admin/components/certificate.html', entry = entry)