from flask import Blueprint, render_template, flash, redirect, request, jsonify, abort, session from flask.helpers import url_for from functools import wraps from datetime import datetime import os from glob import glob from json import loads from werkzeug.security import check_password_hash from common.security.database import decrypt_find, decrypt_find_one from .models.users import User from flask_mail import Message from main import app, db from uuid import uuid4 import secrets from main import mail from datetime import datetime, date from .models.tests import Test from common.data_tools import get_default_dataset, get_time_options, available_datasets, get_datasets views = Blueprint( 'admin_views', __name__, template_folder='templates', static_folder='static' ) def admin_account_required(function): @wraps(function) def decorated_function(*args, **kwargs): if not db.users.find_one({}): flash('No administrator accounts have been registered. Please register an administrator account.', 'alert') return redirect(url_for('admin_auth.register')) return function(*args, **kwargs) return decorated_function def disable_on_registration(function): @wraps(function) def decorated_function(*args, **kwargs): if db.users.find_one({}): return abort(404) return function(*args, **kwargs) return decorated_function def get_id_from_cookie(): return request.cookies.get('_id') def get_user_from_db(_id): return db.users.find_one({'_id': _id}) def check_login(): _id = get_id_from_cookie() return True if get_user_from_db(_id) else False def login_required(function): @wraps(function) def decorated_function(*args, **kwargs): if not check_login(): session['prev_page'] = request.url flash('Please log in to view this page.', 'alert') return redirect(url_for('admin_auth.login')) return function(*args, **kwargs) return decorated_function def disable_if_logged_in(function): @wraps(function) def decorated_function(*args, **kwargs): if check_login(): return abort(404) return function(*args, **kwargs) return decorated_function @views.route('/') @views.route('/home/') @views.route('/dashboard/') @admin_account_required @login_required def home(): tests = db.tests.find() results = decrypt_find(db.entries, {}) current_tests = [ test for test in tests if test['expiry_date'].date() >= date.today() and test['start_date'].date() <= date.today() ] current_tests.sort(key= lambda x: x['expiry_date'], reverse=True) upcoming_tests = [ test for test in tests if test['start_date'].date() > date.today()] upcoming_tests.sort(key= lambda x: x['start_date']) recent_results = [result for result in results if 'submission_time' in result ] recent_results.sort(key= lambda x: x['submission_time'], reverse=True) for result in recent_results: result['percent'] = round(100*result['results']['score']/result['results']['max']) return render_template('/admin/index.html', current_tests = current_tests[:5], upcomimg_tests = upcoming_tests[:5], recent_results = recent_results[:5]) @views.route('/settings/') @admin_account_required @login_required def settings(): users = decrypt_find(db.users, {}) users.sort(key= lambda x: x['username']) datasets = get_datasets() return render_template('/admin/settings/index.html', users=users[:5], datasets=datasets[:5]) @views.route('/settings/users/', methods=['GET','POST']) @admin_account_required @login_required def users(): from .models.forms import CreateUserForm form = CreateUserForm() if request.method == 'GET': users_list = decrypt_find(db.users, {}) return render_template('/admin/settings/users.html', users = users_list, form = form) if request.method == 'POST': if form.validate_on_submit(): entry = User( _id = uuid4().hex, username = request.form.get('username').lower(), email = request.form.get('email'), password = request.form.get('password') if not request.form.get('password') == '' else secrets.token_hex(12), ) email = Message( subject = 'RefTest | Registration Confirmation', recipients = [entry.email], body = f""" Hello {entry.username}, \n\n You have been registered as an administrator for the SKA RefTest App!\n\n You can access your account using the username '{entry.username}'.\n\n Your password is as follows:\n\n {entry.password}\n\n You can change your password by logging in to the admin console by copying the following URL into a web browser:\n\n {url_for('admin_views.home', _external = True)}\n\n Have a nice day. """, html = f"""
Hello {entry.username},
You have been registered as an administrator for the SKA RefTest App!
You can access your account using the username '{entry.username}'.
Your password is as follows:
{entry.password}You can change your password by logging in to the admin console at the link below:
{url_for('admin_views.home', _external = True)}
Have a nice day.
""" ) mail.send(email) return entry.register() else: errors = [*form.username.errors, *form.email.errors, *form.password.errors] return jsonify({ 'error': errors}), 400 @views.route('/settings/users/delete/Hello {user['username']},
Your administrator account for the SKA RefTest App has been deleted by {_user['username']}. All data about your account has been deleted.
If you believe this was done in error, please contact them immediately.
If you would like to register to administer the app, please ask an existing administrator to create a new account.
Have a nice day.
""" ) mail.send(email) user = User( _id = user['_id'] ) return user.delete() else: return abort(400) @views.route('/settings/users/update/Hello {user['username']},
Your administrator account for the SKA RefTest App has been updated by {_user['username']}.
Your new account details are as follows:
Email: {recipient}
Password: {request.form.get('password')}
You can update your email and password by logging in to the app.
Have a nice day.
""" ) mail.send(email) entry = User( _id = _id, email = request.form.get('email'), password = request.form.get('password') ) return entry.update() else: errors = [*form.user_password.errors, *form.email.errors, *form.password.errors, *form.password_reenter.errors] return jsonify({ 'error': errors}), 400 @views.route('/settings/questions/', methods=['GET', 'POST']) @admin_account_required @login_required def questions(): from .models.forms import UploadDataForm from common.data_tools import check_json_format, validate_json_contents, store_data_file form = UploadDataForm() if request.method == 'GET': data = get_datasets() default = get_default_dataset() return render_template('/admin/settings/questions.html', form=form, data=data, default=default) if request.method == 'POST': if form.validate_on_submit(): upload = form.data_file.data default = True if request.form.get('default') else False if not check_json_format(upload): return jsonify({ 'error': 'Invalid file selected. Please upload a JSON file.'}), 400 if not validate_json_contents(upload): return jsonify({'error': 'The data in the file is invalid.'}), 400 filename = store_data_file(upload, default=default) flash(f'Dataset {form.data_file.data.filename} has been uploaded as {filename}.', 'success') return jsonify({ 'success': f'Dataset {form.data_file.data.filename} has been uploaded as {filename}.'}), 200 errors = [*form.errors] return jsonify({ 'error': errors}), 400 @views.route('/settings/questions/delete/', methods=['POST']) @admin_account_required @login_required def delete_questions(): filename = request.get_json()['filename'] data_files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json')) if any(filename in file for file in data_files): default = get_default_dataset() if default == filename: return jsonify({'error': 'Cannot delete the default question dataset.'}), 400 data_file = os.path.join(app.config["DATA_FILE_DIRECTORY"],filename) with open(data_file, 'r') as _data_file: data = loads(_data_file.read()) if data['meta']['tests']: return jsonify({'error': 'Cannot delete a dataset that is in use by an exam.'}), 400 if len(data_files) == 1: return jsonify({'error': 'Cannot delete the only question dataset.'}), 400 os.remove(data_file) flash(f'Question dataset {filename} has been deleted.', 'success') return jsonify({'success': f'Question dataset {filename} has been deleted.'}), 200 return abort(404) @views.route('/settings/questions/default/', methods=['POST']) @admin_account_required @login_required def make_default_questions(): filename = request.get_json()['filename'] data_files = glob(os.path.join(app.config["DATA_FILE_DIRECTORY"],'*.json')) default_file_path = os.path.join(app.config['DATA_FILE_DIRECTORY'], '.default.txt') if any(filename in file for file in data_files): with open(default_file_path, 'r') as default_file: default = default_file.read() if default == filename: return jsonify({'error': 'Cannot delete default question dataset.'}), 400 with open(default_file_path, 'w') as default_file: default_file.write(filename) flash(f'Set dataset f{filename} as the default.', 'success') return jsonify({'success': f'Set dataset {filename} as the default.'}) return abort(404) @views.route('/tests/