ska-referee-test/ref-test/app/admin/views.py

456 lines
21 KiB
Python

from ..forms.admin import AddTimeAdjustment, CreateTest, CreateUser, DeleteUser, Login, Register, ResetPassword, UpdatePassword, UpdateUser, UploadData
from ..models import Dataset, Entry, Test, User
from ..tools.auth import disable_if_logged_in, require_account_creation
from ..tools.data import check_dataset_exists, check_is_json, validate_json
from ..tools.forms import get_dataset_choices, get_time_options, send_errors_to_client
from ..tools.logs import write
from ..tools.test import answer_options, get_correct_answers
from flask import abort, Blueprint, jsonify, render_template, request, send_file, session
from flask.helpers import abort, flash, redirect, url_for
from flask_login import current_user, login_required
from datetime import date, datetime, MINYEAR, timedelta
from json import loads
from os import path
import secrets
admin = Blueprint(
name='admin',
import_name=__name__,
template_folder='templates',
static_folder='static'
)
@admin.route('/')
@admin.route('/home/')
@admin.route('/dashboard/')
@login_required
def _home():
try:
tests = Test.query.all()
results = Entry.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
current_tests = [ test for test in tests if test.end_date >= datetime.now() and test.start_date.date() <= date.today() ]
current_tests.sort(key= lambda x: x.end_date or datetime(MINYEAR,1,1), reverse=True)
upcoming_tests = [ test for test in tests if test.start_date.date() > datetime.now().date()]
upcoming_tests.sort(key= lambda x: x.start_date or datetime(MINYEAR,1,1))
recent_results = [result for result in results if not result.status == 'started' ]
recent_results.sort(key= lambda x: x.end_time or datetime(MINYEAR,1,1), reverse=True)
return render_template('/admin/index.html', current_tests = current_tests, upcomimg_tests = upcoming_tests, recent_results = recent_results)
@admin.route('/settings/')
@login_required
def _settings():
try:
users = User.query.all()
datasets = Dataset.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/admin/settings/index.html', users=users, datasets=datasets)
@admin.route('/login/', methods=['GET','POST'])
@disable_if_logged_in
@require_account_creation
def _login():
form = Login()
if request.method == 'POST':
if form.validate_on_submit():
try: users = User.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
user = None
for _user in users:
if _user.get_username() == request.form.get('username').lower():
user = _user
break
if user:
if user.verify_password(request.form.get('password')):
user.login(remember=request.form.get('remember'))
return jsonify({'success': f'Successfully logged in.'}), 200
return jsonify({'error': f'The password you entered is incorrect.'}), 401
return jsonify({'error': f'The username you entered does not exist.'}), 401
return send_errors_to_client(form=form)
if 'remembered_username' in session: form.username.data = session.pop('remembered_username')
next = request.args.get('next')
return render_template('/admin/auth/login.html', form=form, next=next)
@admin.route('/logout/')
@login_required
def _logout():
current_user.logout()
return redirect(url_for('admin._login'))
@admin.route('/register/', methods=['GET','POST'])
@disable_if_logged_in
def _register():
from ..models.user import User
form = Register()
if request.method == 'POST':
if form.validate_on_submit():
new_user = User()
new_user.set_username(request.form.get('username').lower())
new_user.set_email(request.form.get('email').lower())
success, message = new_user.register(password=request.form.get('password'))
if success:
flash(message=f'{message} Please log in to continue.', category='success')
session['remembered_username'] = request.form.get('username').lower()
return jsonify({'success': message}), 200
flash(message=message, category='error')
return jsonify({'error': message}), 401
return send_errors_to_client(form=form)
return render_template('/admin/auth/register.html', form=form)
@admin.route('/reset/', methods=['GET','POST'])
def _reset():
form = ResetPassword()
if request.method == 'POST':
if form.validate_on_submit():
user = None
try: users = User.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
for _user in users:
if _user.get_username() == request.form.get('username'):
user = _user
break
if not user: return jsonify({'error': 'The user account does not exist.'}), 400
if not user.get_email() == request.form.get('email'): return jsonify({'error': 'The email address does not match the user account.'}), 400
return user.reset_password()
return send_errors_to_client(form=form)
token = request.args.get('token')
if token:
try: user = User.query.filter_by(reset_token=token).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not user: return redirect(url_for('admin._reset'))
verification_token = user.verification_token
user.clear_reset_tokens()
if request.args.get('verification') == verification_token:
form = UpdatePassword()
session['user'] = user.id
return render_template('/admin/auth/update-password.html', form=form)
flash('The verification of your password reset request failed and the token has been invalidated. Please make a new reset password request.', 'error')
return render_template('/admin/auth/reset.html', form=form)
@admin.route('/update_password/', methods=['POST'])
def _update_password():
form = UpdatePassword()
if form.validate_on_submit():
user = session.pop('user')
try: user = User.query.filter_by(id=user).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
user.update(password=request.form.get('password'))
session['remembered_username'] = user.get_username()
flash('Your password has been reset.', 'success')
return jsonify({'success':'Your password has been reset'}), 200
return send_errors_to_client(form=form)
@admin.route('/settings/users/', methods=['GET', 'POST'])
@login_required
def _users():
form = CreateUser()
try: users = User.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if request.method == 'POST':
if form.validate_on_submit():
password = request.form.get('password')
password = secrets.token_hex(12) if not password else password
new_user = User()
new_user.set_username(request.form.get('username').lower())
new_user.set_email(request.form.get('email'))
success, message = new_user.register(notify=request.form.get('notify'), password=password)
if success: return jsonify({'success': message}), 200
return jsonify({'error': message}), 401
return send_errors_to_client(form=form)
return render_template('/admin/settings/users.html', form = form, users = users)
@admin.route('/settings/users/delete/<string:id>', methods=['GET', 'POST'])
@login_required
def _delete_user(id:str):
try: user = User.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
form = DeleteUser()
if request.method == 'POST':
if not user: return jsonify({'error': 'User does not exist.'}), 400
if id == current_user.id: return jsonify({'error': 'Cannot delete your own account.'}), 400
if form.validate_on_submit():
password = request.form.get('password')
if not current_user.verify_password(password): return jsonify({'error': 'The password you entered is incorrect.'}), 401
success, message = user.delete(notify=request.form.get('notify'))
if success: return jsonify({'success': message}), 200
return jsonify({'error': message}), 400
return send_errors_to_client(form=form)
if id == current_user.id:
flash('Cannot delete your own user account.', 'error')
return redirect(url_for('admin._users'))
if not user:
flash('User not found.', 'error')
return redirect(url_for('admin._users'))
return render_template('/admin/settings/delete_user.html', form=form, id = id, user = user)
@admin.route('/settings/users/update/<string:id>', methods=['GET', 'POST'])
@login_required
def _update_user(id:str):
try: user = User.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
form = UpdateUser()
if request.method == 'POST':
if not user: return jsonify({'error': 'User does not exist.'}), 400
if form.validate_on_submit():
if not current_user.verify_password(request.form.get('confirm_password')): return jsonify({'error': 'Invalid password for your account.'}), 401
success, message = user.update(
password = request.form.get('password'),
email = request.form.get('email'),
notify = request.form.get('notify')
)
if success:
flash(message, 'success')
return jsonify({'success': message}), 200
return jsonify({'error': message}), 400
return send_errors_to_client(form=form)
if not user:
flash('User not found.', 'error')
return redirect(url_for('admin._users'))
return render_template('/admin/settings/update_user.html', form=form, id = id, user = user)
@admin.route('/settings/questions/', methods=['GET', 'POST'])
@login_required
def _questions():
form = UploadData()
if request.method == 'POST':
if form.validate_on_submit():
upload = form.data_file.data
if not check_is_json(upload): return jsonify({'error': 'Invalid file. Please upload a JSON file.'}), 400
upload.stream.seek(0)
data = loads(upload.read())
if not validate_json(data=data): return jsonify({'error': 'The data in the file is invalid.'}), 400
new_dataset = Dataset()
new_dataset.set_name(request.form.get('name'))
success, message = new_dataset.create(
data = data,
default = request.form.get('default')
)
if success: return jsonify({'success': message}), 200
return jsonify({'error': message}), 400
return send_errors_to_client(form=form)
try: data = Dataset.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/admin/settings/questions.html', form=form, data=data)
@admin.route('/settings/questions/delete/', methods=['POST'])
@login_required
def _edit_questions():
id = request.get_json()['id']
action = request.get_json()['action']
if not action == 'delete': return jsonify({'error': 'Invalid action.'}), 400
try: dataset = Dataset.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if action == 'delete': success, message = dataset.delete()
if success: return jsonify({'success': message}), 200
return jsonify({'error': message}), 400
@admin.route('/settings/questions/download/<string:id>/')
@login_required
def _download(id:str):
try: dataset = Dataset.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not dataset: return abort(404)
data_path = path.abspath(dataset.get_file())
return send_file(data_path, as_attachment=True, attachment_filename=f'{dataset.get_name()}.json')
@admin.route('/tests/<string:filter>/', methods=['GET'])
@admin.route('/tests/', methods=['GET'])
@login_required
@check_dataset_exists
def _tests(filter:str=None):
tests = None
try: _tests = Test.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
form = None
now = datetime.now()
if filter not in ['create','active','scheduled','expired','all']: return redirect(url_for('admin._tests', filter='active'))
if filter == 'create':
form = CreateTest()
form.start_date.default = datetime.now()
form.expiry_date.default = date.today() + timedelta(days=1)
form.time_limit.choices = get_time_options()
form.dataset.choices = get_dataset_choices()
form.time_limit.default='none'
form.process()
display_title = ''
error_none = ''
if filter in [None, '', 'active']:
tests = [ test for test in _tests if test.end_date >= now and test.start_date <= now ]
display_title = 'Active Exams'
error_none = 'There are no exams that are currently active. You can create one using the Create Exam form.'
if filter == 'expired':
tests = [ test for test in _tests if test.end_date < now ]
display_title = 'Expired Exams'
error_none = 'There are no expired exams. Exams will appear in this category after their expiration date has passed.'
if filter == 'scheduled':
tests = [ test for test in _tests if test.start_date > now]
display_title = 'Scheduled Exams'
error_none = 'There are no scheduled exams pending. You can schedule an exam for the future using the Create Exam form.'
if filter == 'all':
tests = _tests
display_title = 'All Exams'
error_none = 'There are no exams set up. You can create one using the Create Exam form.'
return render_template('/admin/tests.html', form = form, tests=tests, display_title=display_title, error_none=error_none, filter=filter)
@admin.route('/tests/create/', methods=['POST'])
@login_required
def _create_test():
form = CreateTest()
form.dataset.choices = get_dataset_choices()
form.time_limit.choices = get_time_options()
if form.validate_on_submit():
new_test = Test()
new_test.start_date = request.form.get('start_date')
new_test.start_date = datetime.strptime(new_test.start_date, '%Y-%m-%dT%H:%M')
new_test.end_date = request.form.get('expiry_date')
new_test.end_date = datetime.strptime(new_test.end_date, '%Y-%m-%dT%H:%M')
new_test.time_limit = None if request.form.get('time_limit') == 'none' else int(request.form.get('time_limit'))
dataset = request.form.get('dataset')
try: new_test.dataset = Dataset.query.filter_by(id=dataset).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
success, message = new_test.create()
if success:
flash(message=message, category='success')
return jsonify({'success': message}), 200
return jsonify({'error': message}), 400
return send_errors_to_client(form=form)
@admin.route('/tests/edit/', methods=['POST'])
@login_required
def _edit_test():
id = request.get_json()['id']
action = request.get_json()['action']
if action not in ['start', 'delete', 'end']: return jsonify({'error': 'Invalid action.'}), 400
try: test = Test.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not test: return jsonify({'error': 'Could not find the corresponding test to delete.'}), 404
if action == 'delete': success, message = test.delete()
if action == 'start': success, message = test.start()
if action == 'end': success, message = test.end()
if success:
flash(message=message, category='success')
return jsonify({'success': message}), 200
return jsonify({'error': message}), 400
@admin.route('/test/<string:id>/', methods=['GET','POST'])
@login_required
def _view_test(id:str=None):
form = AddTimeAdjustment()
try: test = Test.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if request.method == 'POST':
if not test: return jsonify({'error': 'Invalid test ID.'}), 404
if form.validate_on_submit():
time = int(request.form.get('time'))
success, message = test.add_adjustment(time)
if success: return jsonify({'success': message}), 200
return jsonify({'error': message}), 400
return jsonify({'error': form.time.errors }), 400
if not test:
flash('Invalid test ID.', 'error')
return redirect(url_for('admin._tests', filter='active'))
return render_template('/admin/test.html', test = test, form = form)
@admin.route('/test/<string:id>/delete-adjustment/', methods=['POST'])
@login_required
def _delete_adjustment(id:str=None):
try: test = Test.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not test: return jsonify({'error': 'Invalid test ID.'}), 404
user_code = request.get_json()['user_code'].lower()
success, message = test.remove_adjustment(user_code)
if success: return jsonify({'success': message}), 200
return jsonify({'error': message}), 400
@admin.route('/results/')
@login_required
def _view_entries():
try: entries = Entry.query.all()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
return render_template('/admin/results.html', entries = entries)
@admin.route('/results/<string:id>/', methods = ['GET', 'POST'])
@login_required
def _view_entry(id:str=None):
try: entry = Entry.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if request.method == 'POST':
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404
action = request.get_json()['action']
if action not in ['validate', 'delete']: return jsonify({'error': 'Invalid action.'}), 400
if action == 'validate':
success, message = entry.validate()
if action == 'delete':
success, message = entry.delete()
if success:
flash(message, 'success')
entry.notify_result()
return jsonify({'success': message}), 200
return jsonify({'error': message}),400
if not entry:
flash('Invalid entry ID.', 'error')
return redirect(url_for('admin._view_entries'))
test = entry.test
dataset = test.dataset
dataset_path = dataset.get_file()
with open(dataset_path, 'r') as _dataset:
data = loads(_dataset.read())
correct = get_correct_answers(dataset=data)
answers = answer_options(dataset=data)
return render_template('/admin/result-detail.html', entry = entry, correct = correct, answers = answers)
@admin.route('/certificate/',methods=['POST'])
@login_required
def _generate_certificate():
from ..extensions import db
id = request.get_json()['id']
try: entry = Entry.query.filter_by(id=id).first()
except Exception as exception:
write('system.log', f'Database error when processing request \'{request.url}\': {exception}')
return abort(500)
if not entry: return jsonify({'error': 'Invalid entry ID.'}), 404
return render_template('/admin/components/certificate.html', entry = entry)