from flask import Blueprint, render_template, flash, redirect, request, jsonify, abort, session from flask.helpers import url_for from functools import wraps from datetime import datetime, timedelta import os from glob import glob from json import loads from werkzeug.security import check_password_hash from common.security.database import decrypt_find, decrypt_find_one from .models.users import User from flask_mail import Message from uuid import uuid4 import secrets from datetime import datetime, date from .models.tests import Test from common.data_tools import get_default_dataset, get_time_options, available_datasets, get_datasets, get_correct_answers views = Blueprint( 'admin_views', __name__, template_folder='templates', static_folder='static' ) def admin_account_required(function): @wraps(function) def decorated_function(*args, **kwargs): from main import db from main import db if not db.users.find_one({}): flash('No administrator accounts have been registered. Please register an administrator account.', 'alert') return redirect(url_for('admin_auth.register')) return function(*args, **kwargs) return decorated_function def disable_on_registration(function): @wraps(function) def decorated_function(*args, **kwargs): from main import db if db.users.find_one({}): return abort(404) return function(*args, **kwargs) return decorated_function def get_id_from_cookie(): return request.cookies.get('_id') def get_user_from_db(_id): from main import db return db.users.find_one({'_id': _id}) def check_login(): _id = get_id_from_cookie() return True if get_user_from_db(_id) else False def login_required(function): @wraps(function) def decorated_function(*args, **kwargs): if not check_login(): session['prev_page'] = request.url flash('Please log in to view this page.', 'alert') return redirect(url_for('admin_auth.login')) return function(*args, **kwargs) return decorated_function def disable_if_logged_in(function): @wraps(function) def decorated_function(*args, **kwargs): if check_login(): return abort(404) return function(*args, **kwargs) return decorated_function @views.route('/') @views.route('/home/') @views.route('/dashboard/') @admin_account_required @login_required def home(): from main import db tests = db.tests.find() results = decrypt_find(db.entries, {}) current_tests = [ test for test in tests if test['expiry_date'] >= datetime.utcnow() and test['start_date'].date() <= date.today() ] current_tests.sort(key= lambda x: x['expiry_date'], reverse=True) upcoming_tests = [ test for test in tests if test['start_date'] > datetime.utcnow()] upcoming_tests.sort(key= lambda x: x['start_date']) recent_results = [result for result in results if 'submission_time' in result ] recent_results.sort(key= lambda x: x['submission_time'], reverse=True) for result in recent_results: result['percent'] = round(100*result['results']['score']/result['results']['max']) return render_template('/admin/index.html', current_tests = current_tests[:5], upcomimg_tests = upcoming_tests[:5], recent_results = recent_results[:5]) @views.route('/settings/') @admin_account_required @login_required def settings(): from main import db users = decrypt_find(db.users, {}) users.sort(key= lambda x: x['username']) datasets = get_datasets() return render_template('/admin/settings/index.html', users=users[:5], datasets=datasets[:5]) @views.route('/settings/users/', methods=['GET','POST']) @admin_account_required @login_required def users(): from main import db, mail from .models.forms import CreateUserForm form = CreateUserForm() if request.method == 'GET': users_list = decrypt_find(db.users, {}) return render_template('/admin/settings/users.html', users = users_list, form = form) if request.method == 'POST': if form.validate_on_submit(): entry = User( _id = uuid4().hex, username = request.form.get('username').lower(), email = request.form.get('email'), password = request.form.get('password') if not request.form.get('password') == '' else secrets.token_hex(12), ) email = Message( subject = 'RefTest | Registration Confirmation', recipients = [entry.email], body = f""" Hello {entry.username}, \n\n You have been registered as an administrator for the SKA RefTest App!\n\n You can access your account using the username '{entry.username}'.\n\n Your password is as follows:\n\n {entry.password}\n\n You can change your password by logging in to the admin console by copying the following URL into a web browser:\n\n {url_for('admin_views.home', _external = True)}\n\n Have a nice day. """, html = f"""

Hello {entry.username},

You have been registered as an administrator for the SKA RefTest App!

You can access your account using the username '{entry.username}'.

Your password is as follows:

{entry.password}

You can change your password by logging in to the admin console at the link below:

{url_for('admin_views.home', _external = True)}

Have a nice day.

""" ) mail.send(email) return entry.register() else: errors = [*form.username.errors, *form.email.errors, *form.password.errors] return jsonify({ 'error': errors}), 400 @views.route('/settings/users/delete/', methods = ['GET', 'POST']) @admin_account_required @login_required def delete_user(_id:str): from main import db, mail if _id == get_id_from_cookie(): flash('Cannot delete your own user account.', 'error') return redirect(url_for('admin_views.users')) from .models.forms import DeleteUserForm form = DeleteUserForm() user = decrypt_find_one(db.users, {'_id': _id}) if request.method == 'GET': if not user: return abort(404) return render_template('/admin/settings/delete-user.html', form = form, _id = _id, user = user) if request.method == 'POST': if not user: return jsonify({ 'error': 'User does not exist.' }), 404 if form.validate_on_submit(): _user = decrypt_find_one(db.users, {'_id': get_id_from_cookie()}) password = request.form.get('password') if not check_password_hash(_user['password'], password): return jsonify({ 'error': 'The password you entered is incorrect.' }), 401 if request.form.get('notify'): email = Message( subject = 'RefTest | Account Deletion', recipients = [user['email']], bcc = [_user['email']], body = f""" Hello {user['username']}, \n\n Your administrator account for the SKA RefTest App has been deleted by {_user['username']}. All data about your account has been deleted.\n\n If you believe this was done in error, please contact them immediately.\n\n If you would like to register to administer the app, please ask an existing administrator to create a new account.\n\n Have a nice day. """, html = f"""

Hello {user['username']},

Your administrator account for the SKA RefTest App has been deleted by {_user['username']}. All data about your account has been deleted.

If you believe this was done in error, please contact them immediately.

If you would like to register to administer the app, please ask an existing administrator to create a new account.

Have a nice day.

""" ) mail.send(email) user = User( _id = user['_id'] ) return user.delete() else: return abort(400) @views.route('/settings/users/update/', methods = ['GET', 'POST']) @admin_account_required @login_required def update_user(_id:str): from main import db, mail if _id == get_id_from_cookie(): flash('Cannot delete your own user account.', 'error') return redirect(url_for('admin_views.users')) from .models.forms import UpdateUserForm form = UpdateUserForm() user = decrypt_find_one( db.users, {'_id': _id}) if request.method == 'GET': if not user: return abort(404) return render_template('/admin/settings/update-user.html', form = form, _id = _id, user = user) if request.method == 'POST': if not user: return jsonify({ 'error': 'User does not exist.' }), 404 if form.validate_on_submit(): _user = decrypt_find_one(db.users, {'_id': get_id_from_cookie()}) password = request.form.get('password') if not check_password_hash(_user['password'], password): return jsonify({ 'error': 'The password you entered is incorrect.' }), 401 if request.form.get('notify'): recipient = request.form.get('email') if not request.form.get('email') == '' else user['email'] email = Message( subject = 'RefTest | Account Update', recipients = [recipient], bcc = [_user['email']], body = f""" Hello {user['username']}, \n\n Your administrator account for the SKA RefTest App has been updated by {_user['username']}.\n\n Your new account details are as follows:\n\n Email: {recipient}\n Password: {request.form.get('password')}\n\n You can update your email and password by logging in to the app.\n\n Have a nice day. """, html = f"""

Hello {user['username']},

Your administrator account for the SKA RefTest App has been updated by {_user['username']}.

Your new account details are as follows:

Email: {recipient}
Password: {request.form.get('password')}

You can update your email and password by logging in to the app.

Have a nice day.

""" ) mail.send(email) entry = User( _id = _id, email = request.form.get('email'), password = request.form.get('password') ) return entry.update() else: errors = [*form.user_password.errors, *form.email.errors, *form.password.errors, *form.password_reenter.errors] return jsonify({ 'error': errors}), 400 @views.route('/settings/questions/', methods=['GET', 'POST']) @admin_account_required @login_required def questions(): from .models.forms import UploadDataForm from common.data_tools import check_json_format, validate_json_contents, store_data_file form = UploadDataForm() if request.method == 'GET': data = get_datasets() default = get_default_dataset() return render_template('/admin/settings/questions.html', form=form, data=data, default=default) if request.method == 'POST': if form.validate_on_submit(): upload = form.data_file.data default = True if request.form.get('default') else False if not check_json_format(upload): return jsonify({ 'error': 'Invalid file selected. Please upload a JSON file.'}), 400 if not validate_json_contents(upload): return jsonify({'error': 'The data in the file is invalid.'}), 400 filename = store_data_file(upload, default=default) flash(f'Dataset {form.data_file.data.filename} has been uploaded as {filename}.', 'success') return jsonify({ 'success': f'Dataset {form.data_file.data.filename} has been uploaded as {filename}.'}), 200 errors = [*form.errors] return jsonify({ 'error': errors}), 400 @views.route('/settings/questions/delete/', methods=['POST']) @admin_account_required @login_required def delete_questions(): from main import db, app filename = request.get_json()['filename'] data_files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json')) if any(filename in file for file in data_files): default = get_default_dataset() if default == filename: return jsonify({'error': 'Cannot delete the default question dataset.'}), 400 data_file = os.path.join(app.config["DATA_FILE_DIRECTORY"],filename) with open(data_file, 'r') as _data_file: data = loads(_data_file.read()) if data['meta']['tests']: return jsonify({'error': 'Cannot delete a dataset that is in use by an exam.'}), 400 if len(data_files) == 1: return jsonify({'error': 'Cannot delete the only question dataset.'}), 400 os.remove(data_file) flash(f'Question dataset {filename} has been deleted.', 'success') return jsonify({'success': f'Question dataset {filename} has been deleted.'}), 200 return abort(404) @views.route('/settings/questions/default/', methods=['POST']) @admin_account_required @login_required def make_default_questions(): from main import app filename = request.get_json()['filename'] data_files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json')) default_file_path = os.path.join(app.config['DATA_FILE_DIRECTORY'], '.default.txt') if any(filename in file for file in data_files): with open(default_file_path, 'r') as default_file: default = default_file.read() if default == filename: return jsonify({'error': 'Cannot delete default question dataset.'}), 400 with open(default_file_path, 'w') as default_file: default_file.write(filename) flash(f'Set dataset f{filename} as the default.', 'success') return jsonify({'success': f'Set dataset {filename} as the default.'}) return abort(404) @views.route('/tests//', methods=['GET']) @views.route('/tests/', methods=['GET']) @admin_account_required @login_required def tests(filter=''): from main import db if not available_datasets(): flash('There are no available question datasets. Please upload a question dataset in order to set up an exam.', 'error') return redirect(url_for('admin_views.questions')) if filter not in ['', 'create', 'active', 'scheduled', 'expired', 'all']: return abort(404) if filter == 'create': from .models.forms import CreateTest form = CreateTest() form.time_limit.choices = get_time_options() form.dataset.choices = available_datasets() form.time_limit.default='none' form.dataset.default=get_default_dataset() form.process() display_title = '' error_none = '' return render_template('/admin/tests.html', form = form, display_title=display_title, error_none=error_none, filter=filter) _tests = db.tests.find({}) if filter == 'active' or filter == '': tests = [ test for test in _tests if test['expiry_date'] >= datetime.utcnow() and test['start_date'].date() <= date.today() ] display_title = 'Active Exams' error_none = 'There are no exams that are currently active. You can create one using the Creat Exam form.' if filter == 'expired': tests = [ test for test in _tests if test['expiry_date'] < datetime.utcnow()] display_title = 'Expired Exams' error_none = 'There are no expired exams. Exams will appear in this category after their expiration date has passed.' if filter == 'scheduled': tests = [ test for test in _tests if test['start_date'].date() > date.today()] display_title = 'Scheduled Exams' error_none = 'There are no scheduled exams pending. You can schedule an exam for the future using the Create Exam form.' if filter == 'all': tests = _tests display_title = 'All Exams' error_none = 'There are no exams set up. You can create one using the Create Exam form.' return render_template('/admin/tests.html', tests = tests, display_title=display_title, error_none=error_none, filter=filter) @views.route('/tests/create/', methods=['POST']) @admin_account_required @login_required def create_test(): from main import db from .models.forms import CreateTest form = CreateTest() form.dataset.choices = available_datasets() form.time_limit.choices = get_time_options() if form.validate_on_submit(): start_date = request.form.get('start_date') start_date = datetime.strptime(start_date, '%Y-%m-%d') expiry_date = request.form.get('expiry_date') expiry_date = datetime.strptime(expiry_date, '%Y-%m-%d') + timedelta(days= 1) - timedelta(milliseconds = 1) dataset = request.form.get('dataset') errors = [] if start_date.date() < date.today(): errors.append('The start date cannot be in the past.') if expiry_date.date() < date.today(): errors.append('The expiry date cannot be in the past.') if expiry_date < start_date: errors.append('The expiry date cannot be before the start date.') if errors: return jsonify({'error': errors}), 400 creator_id = get_id_from_cookie() creator = decrypt_find_one(db.users, { '_id': creator_id } )['username'] test = Test( _id = uuid4().hex, start_date = start_date, expiry_date = expiry_date, time_limit = request.form.get('time_limit'), creator = creator, dataset = dataset ) test.create() return jsonify({'success': 'New exam created.'}), 200 else: errors = [*form.expiry.errors, *form.time_limit.errors] return jsonify({ 'error': errors}), 400 @views.route('/tests/delete/', methods=['POST']) @admin_account_required @login_required def delete_test(): from main import db _id = request.get_json()['_id'] if db.tests.find_one({'_id': _id}): return Test(_id = _id).delete() return jsonify({'error': 'Could not find the corresponding test to delete.'}), 404 @views.route('/tests/close/', methods=['POST']) @admin_account_required @login_required def close_test(): from main import db _id = request.get_json()['_id'] if db.tests.find_one({'_id': _id}): return Test(_id = _id, expiry_date= datetime.utcnow()).update() return jsonify({'error': 'Could not find the corresponding test to delete.'}), 404 @views.route('/test/<_id>/', methods=['GET','POST']) @admin_account_required @login_required def view_test(_id): from main import db from .models.forms import AddTimeAdjustment form = AddTimeAdjustment() test = decrypt_find_one(db.tests, {'_id': _id}) if request.method == 'GET': if not test: return abort(404) return render_template('/admin/test.html', test = test, form = form) if request.method == 'POST': if form.validate_on_submit(): time = int(request.form.get('time')) return Test(_id=_id).add_time_adjustment(time) return jsonify({'error': form.time.errors }), 400 @views.route('/test/<_id>/delete-adjustment/', methods = ['POST']) @admin_account_required @login_required def delete_adjustment(_id): user_code = request.get_json()['user_code'] return Test(_id=_id).remove_time_adjustment(user_code) @views.route('/results/') @admin_account_required @login_required def view_entries(): from main import db entries = decrypt_find(db.entries, {}) return render_template('/admin/results.html', entries = entries) @views.route('/results/<_id>/', methods = ['GET', 'POST']) @admin_account_required @login_required def view_entry(_id=''): from main import app, db entry = decrypt_find_one(db.entries, {'_id': _id}) if request.method == 'GET': if not entry: return abort(404) test_code = entry['test_code'] test = db.tests.find_one({'test_code' : test_code}) dataset = test['dataset'] dataset_path = os.path.join(app.config['DATA_FILE_DIRECTORY'], dataset) with open(dataset_path, 'r') as _dataset: data = loads(_dataset.read()) correct = get_correct_answers(dataset=data) print(correct.values()) return render_template('/admin/result-detail.html', entry = entry, correct = correct) if request.method == 'POST': if not entry: return jsonify({'error': 'A valid entry could no be found.'}), 404 action = request.get_json()['action'] if action == 'override': late_ignore = db.entries.find_one_and_update({'_id': _id}, {'$set': {'status': 'late (allowed)'}}) if late_ignore: flash('Late status for the entry has been allowed.', 'success') return jsonify({'success': 'Late status allowed.'}), 200 return jsonify({'error': 'An error occurred.'}), 400 if action == 'delete': test_code = entry['test_code'] test = db.tests.find_one_and_update({'test_code': test_code}, {'$pull': {'entries': _id}}) if not test: return jsonify({'error': 'A valid exam could not be found.'}), 404 delete = db.entries.delete_one({'_id': _id}) if delete: flash('Entry has been deleted.', 'success') return jsonify({'success': 'Entry has been deleted.'}), 200 return jsonify({'error': 'An error occurred.'}), 400 @views.route('/certificate/', methods=['POST']) @admin_account_required @login_required def generate_certificate(): from main import db _id = request.get_json()['_id'] entry = decrypt_find_one(db.entries, {'_id': _id}) if not entry: return abort(404) return render_template('/admin/components/certificate.html', entry = entry)