viveksantayana
0ab06af8be
Refactored to move security package inside common Moved data folder to process root.
338 lines
14 KiB
Python
338 lines
14 KiB
Python
from flask import Blueprint, render_template, flash, redirect, request, jsonify, abort
|
|
from flask.helpers import url_for
|
|
from functools import wraps
|
|
from datetime import datetime
|
|
|
|
from werkzeug.security import check_password_hash
|
|
from common.security.database import decrypt_find, decrypt_find_one
|
|
from .models.users import User
|
|
from flask_mail import Message
|
|
from main import db
|
|
from uuid import uuid4
|
|
import secrets
|
|
from main import mail
|
|
from datetime import datetime, date, timedelta
|
|
from .models.tests import Test
|
|
|
|
views = Blueprint(
|
|
'admin_views',
|
|
__name__,
|
|
template_folder='templates',
|
|
static_folder='static'
|
|
)
|
|
|
|
def admin_account_required(function):
|
|
@wraps(function)
|
|
def decorated_function(*args, **kwargs):
|
|
if not db.users.find_one({}):
|
|
flash('No administrator accounts have been registered. Please register an administrator account.', 'alert')
|
|
return redirect(url_for('admin_auth.register'))
|
|
return function(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def disable_on_registration(function):
|
|
@wraps(function)
|
|
def decorated_function(*args, **kwargs):
|
|
if db.users.find_one({}):
|
|
return abort(404)
|
|
return function(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def get_id_from_cookie():
|
|
return request.cookies.get('_id')
|
|
|
|
def get_user_from_db(_id):
|
|
return db.users.find_one({'_id': _id})
|
|
|
|
def check_login():
|
|
_id = get_id_from_cookie()
|
|
return True if get_user_from_db(_id) else False
|
|
|
|
def login_required(function):
|
|
@wraps(function)
|
|
def decorated_function(*args, **kwargs):
|
|
if not check_login():
|
|
flash('Please log in to view this page.', 'alert')
|
|
return redirect(url_for('admin_auth.login'))
|
|
return function(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def disable_if_logged_in(function):
|
|
@wraps(function)
|
|
def decorated_function(*args, **kwargs):
|
|
if check_login():
|
|
return abort(404)
|
|
return function(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
@views.route('/')
|
|
@views.route('/home/')
|
|
@views.route('/dashboard/')
|
|
@admin_account_required
|
|
@login_required
|
|
def home():
|
|
return render_template('/admin/index.html')
|
|
|
|
@views.route('/settings/')
|
|
@admin_account_required
|
|
@login_required
|
|
def settings():
|
|
return render_template('/admin/settings/index.html')
|
|
|
|
@views.route('/settings/users/', methods=['GET','POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def users():
|
|
from .models.forms import CreateUserForm
|
|
form = CreateUserForm()
|
|
if request.method == 'GET':
|
|
users_list = decrypt_find(db.users, {})
|
|
return render_template('/admin/settings/users.html', users = users_list, form = form)
|
|
if request.method == 'POST':
|
|
if form.validate_on_submit():
|
|
entry = User(
|
|
_id = uuid4().hex,
|
|
username = request.form.get('username').lower(),
|
|
email = request.form.get('email'),
|
|
password = request.form.get('password') if not request.form.get('password') == '' else secrets.token_hex(12),
|
|
)
|
|
email = Message(
|
|
subject = 'RefTest | Registration Confirmation',
|
|
recipients = [entry.email],
|
|
body = f"""
|
|
Hello {entry.username}, \n\n
|
|
You have been registered as an administrator for the SKA RefTest App!\n\n
|
|
You can access your account using the username '{entry.username}'.\n\n
|
|
Your password is as follows:\n\n
|
|
{entry.password}\n\n
|
|
You can change your password by logging in to the admin console by copying the following URL into a web browser:\n\n
|
|
{url_for('admin_views.home', _external = True)}\n\n
|
|
Have a nice day.
|
|
""",
|
|
html = f"""
|
|
<p>Hello {entry.username},</p>
|
|
<p>You have been registered as an administrator for the SKA RefTest App!</p>
|
|
<p>You can access your account using the username '{entry.username}'.</p>
|
|
<p>Your password is as follows:</p>
|
|
<strong>{entry.password}</strong>
|
|
<p>You can change your password by logging in to the admin console at the link below:</p>
|
|
<p><a href='{url_for('admin_views.home', _external = True)}'>{url_for('admin_views.home', _external = True)}</a></p>
|
|
<p>Have a nice day.</p>
|
|
"""
|
|
)
|
|
mail.send(email)
|
|
return entry.register()
|
|
else:
|
|
errors = [*form.username.errors, *form.email.errors, *form.password.errors]
|
|
return jsonify({ 'error': errors}), 400
|
|
|
|
@views.route('/settings/users/delete/<string:_id>', methods = ['GET', 'POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def delete_user(_id:str):
|
|
if _id == get_id_from_cookie():
|
|
flash('Cannot delete your own user account.', 'error')
|
|
return redirect(url_for('admin_views.users'))
|
|
from .models.forms import DeleteUserForm
|
|
form = DeleteUserForm()
|
|
user = decrypt_find_one(db.users, {'_id': _id})
|
|
if request.method == 'GET':
|
|
if not user:
|
|
return abort(404)
|
|
return render_template('/admin/settings/delete-user.html', form = form, _id = _id, user = user)
|
|
if request.method == 'POST':
|
|
if not user:
|
|
return jsonify({ 'error': 'User does not exist.' }), 404
|
|
if form.validate_on_submit():
|
|
_user = decrypt_find_one(db.users, {'_id': get_id_from_cookie()})
|
|
password = request.form.get('password')
|
|
if not check_password_hash(_user['password'], password):
|
|
return jsonify({ 'error': 'The password you entered is incorrect.' }), 401
|
|
if request.form.get('notify'):
|
|
email = Message(
|
|
subject = 'RefTest | Account Deletion',
|
|
recipients = [user['email']],
|
|
bcc = [_user['email']],
|
|
body = f"""
|
|
Hello {user['username']}, \n\n
|
|
Your administrator account for the SKA RefTest App has been deleted by {_user['username']}. All data about your account has been deleted.\n\n
|
|
If you believe this was done in error, please contact them immediately.\n\n
|
|
If you would like to register to administer the app, please ask an existing administrator to create a new account.\n\n
|
|
Have a nice day.
|
|
""",
|
|
html = f"""
|
|
<p>Hello {user['username']},</p>
|
|
<p>Your administrator account for the SKA RefTest App has been deleted by {_user['username']}. All data about your account has been deleted.</p>
|
|
<p>If you believe this was done in error, please contact them immediately.</p>
|
|
<p>If you would like to register to administer the app, please ask an existing administrator to create a new account.</p>
|
|
<p>Have a nice day.</p>
|
|
"""
|
|
)
|
|
mail.send(email)
|
|
user = User(
|
|
_id = user['_id']
|
|
)
|
|
return user.delete()
|
|
else: return abort(400)
|
|
|
|
@views.route('/settings/users/update/<string:_id>', methods = ['GET', 'POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def update_user(_id:str):
|
|
if _id == get_id_from_cookie():
|
|
flash('Cannot delete your own user account.', 'error')
|
|
return redirect(url_for('admin_views.users'))
|
|
from .models.forms import UpdateUserForm
|
|
form = UpdateUserForm()
|
|
user = decrypt_find_one( db.users, {'_id': _id})
|
|
if request.method == 'GET':
|
|
if not user:
|
|
return abort(404)
|
|
return render_template('/admin/settings/update-user.html', form = form, _id = _id, user = user)
|
|
if request.method == 'POST':
|
|
if not user:
|
|
return jsonify({ 'error': 'User does not exist.' }), 404
|
|
if form.validate_on_submit():
|
|
_user = decrypt_find_one(db.users, {'_id': get_id_from_cookie()})
|
|
password = request.form.get('password')
|
|
if not check_password_hash(_user['password'], password):
|
|
return jsonify({ 'error': 'The password you entered is incorrect.' }), 401
|
|
if request.form.get('notify'):
|
|
recipient = request.form.get('email') if not request.form.get('email') == '' else user['email']
|
|
email = Message(
|
|
subject = 'RefTest | Account Update',
|
|
recipients = [recipient],
|
|
bcc = [_user['email']],
|
|
body = f"""
|
|
Hello {user['username']}, \n\n
|
|
Your administrator account for the SKA RefTest App has been updated by {_user['username']}.\n\n
|
|
Your new account details are as follows:\n\n
|
|
Email: {recipient}\n
|
|
Password: {request.form.get('password')}\n\n
|
|
You can update your email and password by logging in to the app.\n\n
|
|
Have a nice day.
|
|
""",
|
|
html = f"""
|
|
<p>Hello {user['username']},</p>
|
|
<p>Your administrator account for the SKA RefTest App has been updated by {_user['username']}.</p>
|
|
<p>Your new account details are as follows:</p>
|
|
<p>Email: {recipient} <br/>Password: {request.form.get('password')}</p>
|
|
<p>You can update your email and password by logging in to the app.</p>
|
|
<p>Have a nice day.</p>
|
|
"""
|
|
)
|
|
mail.send(email)
|
|
entry = User(
|
|
_id = _id,
|
|
email = request.form.get('email'),
|
|
password = request.form.get('password')
|
|
)
|
|
return entry.update()
|
|
else:
|
|
errors = [*form.user_password.errors, *form.email.errors, *form.password.errors, *form.password_reenter.errors]
|
|
return jsonify({ 'error': errors}), 400
|
|
|
|
@views.route('/settings/questions/', methods=['GET', 'POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def questions():
|
|
from main import app
|
|
from .models.forms import UploadDataForm
|
|
from common.data_tools import check_json_format, validate_json_contents, store_data_file
|
|
form = UploadDataForm()
|
|
if request.method == 'GET':
|
|
return render_template('/admin/settings/questions.html', form=form)
|
|
if request.method == 'POST':
|
|
if form.validate_on_submit():
|
|
upload = form.data_file.data
|
|
if not check_json_format(upload):
|
|
return jsonify({ 'error': 'Invalid file selected. Please upload a JSON file.'}), 400
|
|
if not validate_json_contents(upload):
|
|
return jsonify({'error': 'The data in the file is invalid.'}), 400
|
|
store_data_file(upload)
|
|
return jsonify({ 'success': 'File uploaded.'}), 200
|
|
errors = [*form.errors]
|
|
return jsonify({ 'error': errors}), 400
|
|
|
|
@views.route('/settings/questions/upload/')
|
|
@admin_account_required
|
|
@login_required
|
|
def upload_questions():
|
|
return render_template('/admin/settings/upload-questions.html')
|
|
|
|
@views.route('/tests/<filter>/', methods=['GET'])
|
|
@views.route('/tests/', methods=['GET'])
|
|
@admin_account_required
|
|
@login_required
|
|
def tests(filter=''):
|
|
if filter not in ['', 'create', 'active', 'scheduled', 'expired', 'all']:
|
|
return abort(404)
|
|
if filter == 'create':
|
|
from .models.forms import CreateTest
|
|
form = CreateTest()
|
|
form.time_limit.default='none'
|
|
form.process()
|
|
display_title = ''
|
|
error_none = ''
|
|
return render_template('/admin/tests.html', form = form, display_title=display_title, error_none=error_none, filter=filter)
|
|
_tests = db.tests.find({})
|
|
if filter == 'active' or filter == '':
|
|
tests = [ test for test in _tests if test['expiry_date'].date() >= date.today() and test['start_date'].date() <= date.today() ]
|
|
display_title = 'Active Exams'
|
|
error_none = 'There are no exams that are currently active. You can create one using the Creat Exam form.'
|
|
if filter == 'expired':
|
|
tests = [ test for test in _tests if test['expiry_date'].date() < date.today()]
|
|
display_title = 'Expired Exams'
|
|
error_none = 'There are no expired exams. Exams will appear in this category after their expiration date has passed.'
|
|
if filter == 'scheduled':
|
|
tests = [ test for test in _tests if test['start_date'].date() > date.today()]
|
|
display_title = 'Scheduled Exams'
|
|
error_none = 'There are no scheduled exams pending. You can schedule an exam for the future using the Create Exam form.'
|
|
if filter == 'all':
|
|
tests = _tests
|
|
display_title = 'All Exams'
|
|
error_none = 'There are no exams set up. You can create one using the Create Exam form.'
|
|
return render_template('/admin/tests.html', tests = tests, display_title=display_title, error_none=error_none, filter=filter)
|
|
|
|
@views.route('/tests/create/', methods=['POST'])
|
|
@admin_account_required
|
|
@login_required
|
|
def _tests():
|
|
from .models.forms import CreateTest
|
|
form = CreateTest()
|
|
form.time_limit.default='none'
|
|
form.process()
|
|
if form.validate_on_submit():
|
|
start_date = request.form.get('start_date')
|
|
start_date = datetime.strptime(start_date, '%Y-%m-%d')
|
|
expiry_date = request.form.get('expiry_date')
|
|
expiry_date = datetime.strptime(expiry_date, '%Y-%m-%d')
|
|
errors = []
|
|
if start_date.date() < date.today():
|
|
errors.append('The start date cannot be in the past.')
|
|
if expiry_date.date() < date.today():
|
|
errors.append('The expiry date cannot be in the past.')
|
|
if expiry_date < start_date:
|
|
errors.append('The expiry date cannot be before the start date.')
|
|
if errors:
|
|
return jsonify({'error': errors}), 400
|
|
creator_id = get_id_from_cookie()
|
|
creator = decrypt_find_one(db.users, { '_id': creator_id } )['username']
|
|
test = Test(
|
|
_id = uuid4().hex,
|
|
start_date = start_date,
|
|
expiry_date = expiry_date,
|
|
time_limit = request.form.get('time_limit'),
|
|
creator = creator
|
|
)
|
|
test.create()
|
|
return jsonify({'success': 'New exam created.'}), 200
|
|
else:
|
|
errors = [*form.expiry.errors, *form.time_limit.errors]
|
|
return jsonify({ 'error': errors}), 400
|
|
|
|
@views.route('/tests/delete/<_id>/')
|
|
def delete_test(_id):
|
|
if db.tests.find_one({'_id': _id}):
|
|
return Test(_id = _id).delete()
|
|
return abort(404) |