from flask import Blueprint, render_template, flash, redirect, request, jsonify, abort from flask.helpers import url_for from functools import wraps from datetime import datetime import os from glob import glob from json import loads from werkzeug.security import check_password_hash from common.security.database import decrypt_find, decrypt_find_one from .models.users import User from flask_mail import Message from main import app, db from uuid import uuid4 import secrets from main import mail from datetime import datetime, date, timedelta from .models.tests import Test from common.data_tools import get_default_dataset views = Blueprint( 'admin_views', __name__, template_folder='templates', static_folder='static' ) def admin_account_required(function): @wraps(function) def decorated_function(*args, **kwargs): if not db.users.find_one({}): flash('No administrator accounts have been registered. Please register an administrator account.', 'alert') return redirect(url_for('admin_auth.register')) return function(*args, **kwargs) return decorated_function def disable_on_registration(function): @wraps(function) def decorated_function(*args, **kwargs): if db.users.find_one({}): return abort(404) return function(*args, **kwargs) return decorated_function def get_id_from_cookie(): return request.cookies.get('_id') def get_user_from_db(_id): return db.users.find_one({'_id': _id}) def check_login(): _id = get_id_from_cookie() return True if get_user_from_db(_id) else False def login_required(function): @wraps(function) def decorated_function(*args, **kwargs): if not check_login(): flash('Please log in to view this page.', 'alert') return redirect(url_for('admin_auth.login')) return function(*args, **kwargs) return decorated_function def disable_if_logged_in(function): @wraps(function) def decorated_function(*args, **kwargs): if check_login(): return abort(404) return function(*args, **kwargs) return decorated_function def available_datasets(): files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json')) default = get_default_dataset() output = [] for file in files: filename = file.rsplit('/')[-1] label = f'{filename[:-5]} (Default)' if filename == default else filename[:-5] element = (filename, label) output.append(element) output.reverse() return output @views.route('/') @views.route('/home/') @views.route('/dashboard/') @admin_account_required @login_required def home(): return render_template('/admin/index.html') @views.route('/settings/') @admin_account_required @login_required def settings(): return render_template('/admin/settings/index.html') @views.route('/settings/users/', methods=['GET','POST']) @admin_account_required @login_required def users(): from .models.forms import CreateUserForm form = CreateUserForm() if request.method == 'GET': users_list = decrypt_find(db.users, {}) return render_template('/admin/settings/users.html', users = users_list, form = form) if request.method == 'POST': if form.validate_on_submit(): entry = User( _id = uuid4().hex, username = request.form.get('username').lower(), email = request.form.get('email'), password = request.form.get('password') if not request.form.get('password') == '' else secrets.token_hex(12), ) email = Message( subject = 'RefTest | Registration Confirmation', recipients = [entry.email], body = f""" Hello {entry.username}, \n\n You have been registered as an administrator for the SKA RefTest App!\n\n You can access your account using the username '{entry.username}'.\n\n Your password is as follows:\n\n {entry.password}\n\n You can change your password by logging in to the admin console by copying the following URL into a web browser:\n\n {url_for('admin_views.home', _external = True)}\n\n Have a nice day. """, html = f"""

Hello {entry.username},

You have been registered as an administrator for the SKA RefTest App!

You can access your account using the username '{entry.username}'.

Your password is as follows:

{entry.password}

You can change your password by logging in to the admin console at the link below:

{url_for('admin_views.home', _external = True)}

Have a nice day.

""" ) mail.send(email) return entry.register() else: errors = [*form.username.errors, *form.email.errors, *form.password.errors] return jsonify({ 'error': errors}), 400 @views.route('/settings/users/delete/', methods = ['GET', 'POST']) @admin_account_required @login_required def delete_user(_id:str): if _id == get_id_from_cookie(): flash('Cannot delete your own user account.', 'error') return redirect(url_for('admin_views.users')) from .models.forms import DeleteUserForm form = DeleteUserForm() user = decrypt_find_one(db.users, {'_id': _id}) if request.method == 'GET': if not user: return abort(404) return render_template('/admin/settings/delete-user.html', form = form, _id = _id, user = user) if request.method == 'POST': if not user: return jsonify({ 'error': 'User does not exist.' }), 404 if form.validate_on_submit(): _user = decrypt_find_one(db.users, {'_id': get_id_from_cookie()}) password = request.form.get('password') if not check_password_hash(_user['password'], password): return jsonify({ 'error': 'The password you entered is incorrect.' }), 401 if request.form.get('notify'): email = Message( subject = 'RefTest | Account Deletion', recipients = [user['email']], bcc = [_user['email']], body = f""" Hello {user['username']}, \n\n Your administrator account for the SKA RefTest App has been deleted by {_user['username']}. All data about your account has been deleted.\n\n If you believe this was done in error, please contact them immediately.\n\n If you would like to register to administer the app, please ask an existing administrator to create a new account.\n\n Have a nice day. """, html = f"""

Hello {user['username']},

Your administrator account for the SKA RefTest App has been deleted by {_user['username']}. All data about your account has been deleted.

If you believe this was done in error, please contact them immediately.

If you would like to register to administer the app, please ask an existing administrator to create a new account.

Have a nice day.

""" ) mail.send(email) user = User( _id = user['_id'] ) return user.delete() else: return abort(400) @views.route('/settings/users/update/', methods = ['GET', 'POST']) @admin_account_required @login_required def update_user(_id:str): if _id == get_id_from_cookie(): flash('Cannot delete your own user account.', 'error') return redirect(url_for('admin_views.users')) from .models.forms import UpdateUserForm form = UpdateUserForm() user = decrypt_find_one( db.users, {'_id': _id}) if request.method == 'GET': if not user: return abort(404) return render_template('/admin/settings/update-user.html', form = form, _id = _id, user = user) if request.method == 'POST': if not user: return jsonify({ 'error': 'User does not exist.' }), 404 if form.validate_on_submit(): _user = decrypt_find_one(db.users, {'_id': get_id_from_cookie()}) password = request.form.get('password') if not check_password_hash(_user['password'], password): return jsonify({ 'error': 'The password you entered is incorrect.' }), 401 if request.form.get('notify'): recipient = request.form.get('email') if not request.form.get('email') == '' else user['email'] email = Message( subject = 'RefTest | Account Update', recipients = [recipient], bcc = [_user['email']], body = f""" Hello {user['username']}, \n\n Your administrator account for the SKA RefTest App has been updated by {_user['username']}.\n\n Your new account details are as follows:\n\n Email: {recipient}\n Password: {request.form.get('password')}\n\n You can update your email and password by logging in to the app.\n\n Have a nice day. """, html = f"""

Hello {user['username']},

Your administrator account for the SKA RefTest App has been updated by {_user['username']}.

Your new account details are as follows:

Email: {recipient}
Password: {request.form.get('password')}

You can update your email and password by logging in to the app.

Have a nice day.

""" ) mail.send(email) entry = User( _id = _id, email = request.form.get('email'), password = request.form.get('password') ) return entry.update() else: errors = [*form.user_password.errors, *form.email.errors, *form.password.errors, *form.password_reenter.errors] return jsonify({ 'error': errors}), 400 @views.route('/settings/questions/', methods=['GET', 'POST']) @admin_account_required @login_required def questions(): from .models.forms import UploadDataForm from common.data_tools import check_json_format, validate_json_contents, store_data_file form = UploadDataForm() if request.method == 'GET': files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json')) data = [] if files: for file in files: filename = file.rsplit('/')[-1] with open(file) as _file: load = loads(_file.read()) _author = load['meta']['author'] author = decrypt_find_one(db.users, {'_id': _author})['username'] data_element = { 'filename': filename, 'timestamp': datetime.strptime(load['meta']['timestamp'], '%Y-%m-%d %H%M%S'), 'author': author, 'use': len(load['meta']['tests']) } data.append(data_element) default = get_default_dataset() return render_template('/admin/settings/questions.html', form=form, data=data, default=default) if request.method == 'POST': if form.validate_on_submit(): upload = form.data_file.data default = True if request.form.get('default') else False if not check_json_format(upload): return jsonify({ 'error': 'Invalid file selected. Please upload a JSON file.'}), 400 if not validate_json_contents(upload): return jsonify({'error': 'The data in the file is invalid.'}), 400 filename = store_data_file(upload, default=default) flash(f'Dataset {form.data_file.data.filename} has been uploaded as {filename}.', 'success') return jsonify({ 'success': f'Dataset {form.data_file.data.filename} has been uploaded as {filename}.'}), 200 errors = [*form.errors] return jsonify({ 'error': errors}), 400 @views.route('/settings/questions/delete/') @admin_account_required @login_required def delete_questions(filename): data_files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json')) if any(filename in file for file in data_files): default = get_default_dataset() if default == filename: return jsonify({'error': 'Cannot delete the default question dataset.'}), 400 data_file = os.path.join(app.config["DATA_FILE_DIRECTORY"],filename) with open(data_file, 'r') as _data_file: data = loads(_data_file.read()) if data['meta']['tests']: return jsonify({'error': 'Cannot delete a dataset that is in use by an exam.'}), 400 if len(data_files) == 1: return jsonify({'error': 'Cannot delete the only question dataset.'}), 400 os.remove(data_file) flash(f'Question dataset {filename} has been deleted.', 'success') return jsonify({'success': f'Question dataset {filename} has been deleted.'}), 200 return abort(404) @views.route('/settings/questions/default/') @admin_account_required @login_required def make_default_questions(filename): data_files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json')) default_file_path = os.path.join(app.config['DATA_FILE_DIRECTORY'], '.default.txt') if any(filename in file for file in data_files): with open(default_file_path, 'r') as default_file: default = default_file.read() if default == filename: return jsonify({'error': 'Cannot delete default question dataset.'}), 400 with open(default_file_path, 'w') as default_file: default_file.write(filename) flash(f'Set dataset f{filename} as the default.', 'success') return jsonify({'success': f'Set dataset {filename} as the default.'}) return abort(404) @views.route('/tests//', methods=['GET']) @views.route('/tests/', methods=['GET']) @admin_account_required @login_required def tests(filter=''): if not available_datasets(): flash('There are no available question datasets. Please upload a question dataset in order to set up an exam.', 'error') return redirect(url_for('admin_views.questions')) if filter not in ['', 'create', 'active', 'scheduled', 'expired', 'all']: return abort(404) if filter == 'create': from .models.forms import CreateTest form = CreateTest() form.dataset.choices=available_datasets() form.time_limit.default='none' form.dataset.default=get_default_dataset() form.process() display_title = '' error_none = '' return render_template('/admin/tests.html', form = form, display_title=display_title, error_none=error_none, filter=filter) _tests = db.tests.find({}) if filter == 'active' or filter == '': tests = [ test for test in _tests if test['expiry_date'].date() >= date.today() and test['start_date'].date() <= date.today() ] display_title = 'Active Exams' error_none = 'There are no exams that are currently active. You can create one using the Creat Exam form.' if filter == 'expired': tests = [ test for test in _tests if test['expiry_date'].date() < date.today()] display_title = 'Expired Exams' error_none = 'There are no expired exams. Exams will appear in this category after their expiration date has passed.' if filter == 'scheduled': tests = [ test for test in _tests if test['start_date'].date() > date.today()] display_title = 'Scheduled Exams' error_none = 'There are no scheduled exams pending. You can schedule an exam for the future using the Create Exam form.' if filter == 'all': tests = _tests display_title = 'All Exams' error_none = 'There are no exams set up. You can create one using the Create Exam form.' return render_template('/admin/tests.html', tests = tests, display_title=display_title, error_none=error_none, filter=filter) @views.route('/tests/create/', methods=['POST']) @admin_account_required @login_required def _tests(): from .models.forms import CreateTest form = CreateTest() form.dataset.choices = available_datasets() if form.validate_on_submit(): start_date = request.form.get('start_date') start_date = datetime.strptime(start_date, '%Y-%m-%d') expiry_date = request.form.get('expiry_date') expiry_date = datetime.strptime(expiry_date, '%Y-%m-%d') dataset = request.form.get('dataset') errors = [] if start_date.date() < date.today(): errors.append('The start date cannot be in the past.') if expiry_date.date() < date.today(): errors.append('The expiry date cannot be in the past.') if expiry_date < start_date: errors.append('The expiry date cannot be before the start date.') if errors: return jsonify({'error': errors}), 400 creator_id = get_id_from_cookie() creator = decrypt_find_one(db.users, { '_id': creator_id } )['username'] test = Test( _id = uuid4().hex, start_date = start_date, expiry_date = expiry_date, time_limit = request.form.get('time_limit'), creator = creator, dataset = dataset ) test.create() return jsonify({'success': 'New exam created.'}), 200 else: errors = [*form.expiry.errors, *form.time_limit.errors] return jsonify({ 'error': errors}), 400 @views.route('/tests/delete/<_id>/') def delete_test(_id): if db.tests.find_one({'_id': _id}): return Test(_id = _id).delete() return abort(404)