from ..forms.admin import AddTimeAdjustment, CreateTest, CreateUser, DeleteUser, Login, Register, ResetPassword, UpdatePassword, UpdateUser, UploadData from ..models import Dataset, Entry, Test, User from ..tools.auth import disable_if_logged_in, require_account_creation from ..tools.forms import get_dataset_choices, get_time_options, send_errors_to_client from ..tools.data import check_is_json, validate_json from ..tools.test import answer_options, get_correct_answers from flask import abort, Blueprint, jsonify, render_template, redirect, request, send_file, session from flask.helpers import flash, url_for from flask_login import current_user, login_required from datetime import date, datetime, timedelta from json import loads from os import path import secrets admin = Blueprint( name='admin', import_name=__name__, template_folder='templates', static_folder='static' ) @admin.route('/') @admin.route('/home/') @admin.route('/dashboard/') @login_required def _home(): tests = Test.query.all() results = Entry.query.all() current_tests = [ test for test in tests if test.end_date >= datetime.now() and test.start_date.date() <= date.today() ] current_tests.sort(key= lambda x: x.end_date, reverse=True) upcoming_tests = [ test for test in tests if test.start_date.date() > datetime.now().date()] upcoming_tests.sort(key= lambda x: x.start_date) recent_results = [result for result in results if not result.status == 'started' ] recent_results.sort(key= lambda x: x.end_time, reverse=True) return render_template('/admin/index.html', current_tests = current_tests, upcomimg_tests = upcoming_tests, recent_results = recent_results) @admin.route('/settings/') @login_required def _settings(): users = User.query.all() datasets = Dataset.query.all() return render_template('/admin/settings/index.html', users=users, datasets=datasets) @admin.route('/login/', methods=['GET','POST']) @disable_if_logged_in @require_account_creation def _login(): form = Login() if request.method == 'POST': if form.validate_on_submit(): users = User.query.all() user = None for _user in users: if _user.get_username() == request.form.get('username').lower(): user = _user break if user: if user.verify_password(request.form.get('password')): user.login(remember=request.form.get('remember')) return jsonify({'success': f'Successfully logged in.'}), 200 return jsonify({'error': f'The password you entered is incorrect.'}), 401 return jsonify({'error': f'The username you entered does not exist.'}), 401 return send_errors_to_client(form=form) if 'remembered_username' in session: form.username.data = session.pop('remembered_username') next = request.args.get('next') return render_template('/admin/auth/login.html', form=form, next=next) @admin.route('/logout/') @login_required def _logout(): current_user.logout() return redirect(url_for('admin._login')) @admin.route('/register/', methods=['GET','POST']) @disable_if_logged_in def _register(): from ..models.user import User form = Register() if request.method == 'POST': if form.validate_on_submit(): new_user = User() new_user.set_username(request.form.get('username').lower()) new_user.set_email(request.form.get('email').lower()) success, message = new_user.register(password=request.form.get('password')) if success: flash(message=f'{message} Please log in to continue.', category='success') session['remembered_username'] = request.form.get('username').lower() return jsonify({'success': message}), 200 flash(message=message, category='error') return jsonify({'error': message}), 401 return send_errors_to_client(form=form) return render_template('/admin/auth/register.html', form=form) @admin.route('/reset/', methods=['GET','POST']) def _reset(): form = ResetPassword() if request.method == 'POST': if form.validate_on_submit(): user = None users = User.query.all() for _user in users: if _user.get_username() == request.form.get('username'): user = _user break if not user: return jsonify({'error': 'The user account does not exist.'}), 400 if not user.get_email() == request.form.get('email'): return jsonify({'error': 'The email address does not match the user account.'}), 400 return user.reset_password() return send_errors_to_client(form=form) token = request.args.get('token') if token: user = User.query.filter_by(reset_token=token).first() if not user: return redirect(url_for('admin._reset')) verification_token = user.verification_token user.clear_reset_tokens() if request.args.get('verification') == verification_token: form = UpdatePassword() session['user_id'] = user.id return render_template('/admin/auth/update-password.html', form=form) flash('The verification of your password reset request failed and the token has been invalidated. Please make a new reset password request.', 'error') return render_template('/admin/auth/reset.html', form=form) @admin.route('/update_password/', methods=['POST']) def _update_password(): form = UpdatePassword() if form.validate_on_submit(): user = session.pop('user_id') user = User.query.filter_by(id=user).first() user.update(password=request.form.get('password')) session['remembered_username'] = user.get_username() flash('Your password has been reset.', 'success') return jsonify({'success':'Your password has been reset'}), 200 return send_errors_to_client(form=form) @admin.route('/settings/users/', methods=['GET', 'POST']) @login_required def _users(): form = CreateUser() users = User.query.all() if request.method == 'POST': if form.validate_on_submit(): password = request.form.get('password') password = secrets.token_hex(12) if not password else password new_user = User() new_user.set_username(request.form.get('username').lower()) new_user.set_email(request.form.get('email')) success, message = new_user.register(notify=request.form.get('notify'), password=password) if success: return jsonify({'success': message}), 200 return jsonify({'error': message}), 401 return send_errors_to_client(form=form) return render_template('/admin/settings/users.html', form = form, users = users) @admin.route('/settings/users/delete/', methods=['GET', 'POST']) @login_required def _delete_user(id:str): user = User.query.filter_by(id=id).first() form = DeleteUser() if request.method == 'POST': if not user: return jsonify({'error': 'User does not exist.'}), 400 if id == current_user.id: return jsonify({'error': 'Cannot delete your own account.'}), 400 if form.validate_on_submit(): password = request.form.get('password') if not current_user.verify_password(password): return jsonify({'error': 'The password you entered is incorrect.'}), 401 success, message = user.delete(notify=request.form.get('notify')) if success: return jsonify({'success': message}), 200 return jsonify({'error': message}), 400 return send_errors_to_client(form=form) if id == current_user.id: flash('Cannot delete your own user account.', 'error') return redirect(url_for('admin._users')) if not user: flash('User not found.', 'error') return redirect(url_for('admin._users')) return render_template('/admin/settings/delete_user.html', form=form, id = id, user = user) @admin.route('/settings/users/update/', methods=['GET', 'POST']) @login_required def _update_user(id:str): user = User.query.filter_by(id=id).first() form = UpdateUser() if request.method == 'POST': if not user: return jsonify({'error': 'User does not exist.'}), 400 if form.validate_on_submit(): if not user.verify_password(request.form.get('confirm_password')): return jsonify({'error': 'Invalid password for your account.'}), 401 success, message = user.update( password = request.form.get('password'), email = request.form.get('email'), notify = request.form.get('notify') ) if success: flash(message, 'success') return jsonify({'success': message}), 200 return jsonify({'error': message}), 400 return send_errors_to_client(form=form) if not user: flash('User not found.', 'error') return redirect(url_for('admin._users')) return render_template('/admin/settings/update_user.html', form=form, id = id, user = user) @admin.route('/settings/questions/', methods=['GET', 'POST']) @login_required def _questions(): form = UploadData() if request.method == 'POST': if form.validate_on_submit(): upload = form.data_file.data if not check_is_json(upload): return jsonify({'error': 'Invalid file. Please upload a JSON file.'}), 400 upload.stream.seek(0) data = loads(upload.read()) if not validate_json(data=data): return jsonify({'error': 'The data in the file is invalid.'}), 400 new_dataset = Dataset() new_dataset.set_name(request.form.get('name')) success, message = new_dataset.create( data = data, default = request.form.get('default') ) if success: return jsonify({'success': message}), 200 return jsonify({'error': message}), 400 return send_errors_to_client(form=form) data = Dataset.query.all() return render_template('/admin/settings/questions.html', form=form, data=data) @admin.route('/settings/questions/delete/', methods=['POST']) @login_required def _edit_questions(): id = request.get_json()['id'] action = request.get_json()['action'] if not action == 'delete': return jsonify({'error': 'Invalid action.'}), 400 dataset = Dataset.query.filter_by(id=id).first() if action == 'delete': success, message = dataset.delete() if success: return jsonify({'success': message}), 200 return jsonify({'error': message}), 400 @admin.route('/settings/questions/download//') @login_required def _download(id:str): dataset = Dataset.query.filter_by(id=id).first() if not dataset: return abort(404) data_path = path.abspath(dataset.get_file()) return send_file(data_path, as_attachment=True, attachment_filename=f'{dataset.get_name()}.json') @admin.route('/tests//', methods=['GET']) @admin.route('/tests/', methods=['GET']) @login_required def _tests(filter:str=None): datasets = Dataset.query.all() tests = None _tests = Test.query.all() form = None now = datetime.now() if not datasets: flash('There are no available question datasets. Please upload a question dataset in order to set up an exam.', 'error') return redirect(url_for('admin._questions')) if filter not in ['create','active','scheduled','expired','all']: return redirect(url_for('admin._tests', filter='active')) if filter == 'create': form = CreateTest() form.start_date.default = datetime.now() form.expiry_date.default = date.today() + timedelta(days=1) form.time_limit.choices = get_time_options() form.dataset.choices = get_dataset_choices() form.time_limit.default='none' form.process() display_title = '' error_none = '' if filter in [None, '', 'active']: tests = [ test for test in _tests if test.end_date >= now and test.start_date <= now ] display_title = 'Active Exams' error_none = 'There are no exams that are currently active. You can create one using the Creat Exam form.' if filter == 'expired': tests = [ test for test in _tests if test.end_date < now ] display_title = 'Expired Exams' error_none = 'There are no expired exams. Exams will appear in this category after their expiration date has passed.' if filter == 'scheduled': tests = [ test for test in _tests if test.start_date > now] display_title = 'Scheduled Exams' error_none = 'There are no scheduled exams pending. You can schedule an exam for the future using the Create Exam form.' if filter == 'all': tests = _tests display_title = 'All Exams' error_none = 'There are no exams set up. You can create one using the Create Exam form.' return render_template('/admin/tests.html', form = form, tests=tests, display_title=display_title, error_none=error_none, filter=filter) @admin.route('/tests/create/', methods=['POST']) @login_required def _create_test(): form = CreateTest() form.dataset.choices = get_dataset_choices() form.time_limit.choices = get_time_options() if form.validate_on_submit(): new_test = Test() new_test.start_date = request.form.get('start_date') new_test.start_date = datetime.strptime(new_test.start_date, '%Y-%m-%dT%H:%M') new_test.end_date = request.form.get('expiry_date') new_test.end_date = datetime.strptime(new_test.end_date, '%Y-%m-%dT%H:%M') new_test.time_limit = None if request.form.get('time_limit') == 'none' else int(request.form.get('time_limit')) dataset = request.form.get('dataset') new_test.dataset = Dataset.query.filter_by(id=dataset).first() success, message = new_test.create() if success: flash(message=message, category='success') return jsonify({'success': message}), 200 return jsonify({'error': message}), 400 return send_errors_to_client(form=form) @admin.route('/tests/edit/', methods=['POST']) @login_required def _edit_test(): id = request.get_json()['id'] action = request.get_json()['action'] if action not in ['start', 'delete', 'end']: return jsonify({'error': 'Invalid action.'}), 400 test = Test.query.filter_by(id=id).first() if not test: return jsonify({'error': 'Could not find the corresponding test to delete.'}), 404 if action == 'delete': success, message = test.delete() if action == 'start': success, message = test.start() if action == 'end': success, message = test.end() if success: flash(message=message, category='success') return jsonify({'success': message}), 200 return jsonify({'error': message}), 400 @admin.route('/test//', methods=['GET','POST']) @login_required def _view_test(id:str=None): form = AddTimeAdjustment() test = Test.query.filter_by(id=id).first() if request.method == 'POST': if not test: return jsonify({'error': 'Invalid test ID.'}), 404 if form.validate_on_submit(): time = int(request.form.get('time')) success, message = test.add_adjustment(time) if success: return jsonify({'success': message}), 200 return jsonify({'error': message}), 400 return jsonify({'error': form.time.errors }), 400 if not test: flash('Invalid test ID.', 'error') return redirect(url_for('admin._tests', filter='active')) return render_template('/admin/test.html', test = test, form = form) @admin.route('/test//delete-adjustment/', methods=['POST']) @login_required def _delete_adjustment(id:str=None): test = Test.query.filter_by(id=id).first() if not test: return jsonify({'error': 'Invalid test ID.'}), 404 user_code = request.get_json()['user_code'].lower() success, message = test.remove_adjustment(user_code) if success: return jsonify({'success': message}), 200 return jsonify({'error': message}), 400 @admin.route('/results/') @login_required def _view_entries(): entries = Entry.query.all() return render_template('/admin/results.html', entries = entries) @admin.route('/results//', methods = ['GET', 'POST']) @login_required def _view_entry(id:str=None): entry = Entry.query.filter_by(id=id).first() if request.method == 'POST': if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404 action = request.get_json()['action'] if action not in ['validate', 'delete']: return jsonify({'error': 'Invalid action.'}), 400 if action == 'validate': success, message = entry.validate() if action == 'delete': success, message = entry.delete() if success: flash(message, 'success') entry.notify_result() return jsonify({'success': message}), 200 return jsonify({'error': message}),400 if not entry: flash('Invalid entry ID.', 'error') return redirect(url_for('admin._view_entries')) test = entry.test dataset = test.dataset dataset_path = dataset.get_file() with open(dataset_path, 'r') as _dataset: data = loads(_dataset.read()) correct = get_correct_answers(dataset=data) answers = answer_options(dataset=data) return render_template('/admin/result-detail.html', entry = entry, correct = correct, answers = answers) @admin.route('/certificate/',methods=['POST']) @login_required def _generate_certificate(): from ..extensions import db id = request.get_json()['id'] entry = Entry.query.filter_by(id=id).first() if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404 return render_template('/admin/components/certificate.html', entry = entry)