Refactor to have all models in the models package.
This commit is contained in:
		
							
								
								
									
										0
									
								
								ref-test/admin/models/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										0
									
								
								ref-test/admin/models/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
								
								
									
										56
									
								
								ref-test/admin/models/forms.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										56
									
								
								ref-test/admin/models/forms.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,56 @@
 | 
			
		||||
from flask_wtf import FlaskForm
 | 
			
		||||
from wtforms import StringField, PasswordField, BooleanField, DateField, SelectField
 | 
			
		||||
from wtforms.validators import InputRequired, Email, Length, EqualTo, Optional
 | 
			
		||||
from datetime import date, timedelta
 | 
			
		||||
 | 
			
		||||
class LoginForm(FlaskForm):
 | 
			
		||||
    username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
 | 
			
		||||
    password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
 | 
			
		||||
    remember = BooleanField('Remember Log In', render_kw={'checked': True})
 | 
			
		||||
 | 
			
		||||
class RegistrationForm(FlaskForm):
 | 
			
		||||
    username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
 | 
			
		||||
    email = StringField('Email Address', validators=[InputRequired(), Email(message='You must enter a valid email address.'), Length(max=50)])
 | 
			
		||||
    password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
 | 
			
		||||
    password_reenter = PasswordField('Re-Enter Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.'), EqualTo('password', message='Passwords do not match.')])
 | 
			
		||||
 | 
			
		||||
class ResetPasswordForm(FlaskForm):
 | 
			
		||||
    username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
 | 
			
		||||
    email = StringField('Email Address', validators=[InputRequired(), Email(message='You must enter a valid email address.'), Length(max=50)])
 | 
			
		||||
 | 
			
		||||
class UpdatePasswordForm(FlaskForm):
 | 
			
		||||
    password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
 | 
			
		||||
    password_reenter = PasswordField('Re-Enter Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.'), EqualTo('password', message='Passwords do not match.')])
 | 
			
		||||
 | 
			
		||||
class CreateUserForm(FlaskForm):
 | 
			
		||||
    username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
 | 
			
		||||
    email = StringField('Email Address', validators=[InputRequired(), Email(message='You must enter a valid email address.'), Length(max=50)])
 | 
			
		||||
    password = PasswordField('Password (Optional)', validators=[Optional(),Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
 | 
			
		||||
 | 
			
		||||
class DeleteUserForm(FlaskForm):
 | 
			
		||||
    password = PasswordField('Confirm Your Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
 | 
			
		||||
    notify = BooleanField('Notify deletion by email', render_kw={'checked': True})
 | 
			
		||||
 | 
			
		||||
class UpdateUserForm(FlaskForm):
 | 
			
		||||
    user_password = PasswordField('Confirm Your Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
 | 
			
		||||
    email = StringField('Email Address', validators=[Optional(), Email(message='You must enter a valid email address.'), Length(max=50)])
 | 
			
		||||
    password = PasswordField('Change Password', validators=[Optional(),Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
 | 
			
		||||
    password_reenter = PasswordField('Re-Enter New Password', validators=[EqualTo('password', message='Passwords do not match.')])
 | 
			
		||||
    notify = BooleanField('Notify changes by email', render_kw={'checked': True})
 | 
			
		||||
 | 
			
		||||
class UpdateAccountForm(FlaskForm):
 | 
			
		||||
    password_confirm = PasswordField('Current Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
 | 
			
		||||
    email = StringField('Email Address', validators=[Optional(), Email(message='You must enter a valid email address.'), Length(max=50)])
 | 
			
		||||
    password = PasswordField('Change Password', validators=[Optional(),Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
 | 
			
		||||
    password_reenter = PasswordField('Re-Enter New Password', validators=[EqualTo('password', message='Passwords do not match.')])
 | 
			
		||||
 | 
			
		||||
class CreateTest(FlaskForm):
 | 
			
		||||
    start_date = DateField('Start Date', format="%Y-%m-%d", validators=[InputRequired()], default = date.today() )
 | 
			
		||||
    time_options = [
 | 
			
		||||
        ('none', 'None'),
 | 
			
		||||
        ('60', '1 hour'),
 | 
			
		||||
        ('90', '1 hour 30 minutes'),
 | 
			
		||||
        ('120', '2 hours')
 | 
			
		||||
    ]
 | 
			
		||||
    expiry_date = DateField('Expiry Date', format="%Y-%m-%d", validators=[InputRequired()], default = date.today() + timedelta(days=1) )
 | 
			
		||||
    time_limit = SelectField('Time Limit', choices=time_options)
 | 
			
		||||
							
								
								
									
										95
									
								
								ref-test/admin/models/tests.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										95
									
								
								ref-test/admin/models/tests.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,95 @@
 | 
			
		||||
import secrets
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from uuid import uuid4
 | 
			
		||||
from flask import flash, jsonify
 | 
			
		||||
import secrets
 | 
			
		||||
 | 
			
		||||
from main import db
 | 
			
		||||
from security import encrypt
 | 
			
		||||
 | 
			
		||||
class Test:
 | 
			
		||||
    def __init__(self, _id=None, start_date=None, expiry_date=None, time_limit=None, creator=None):
 | 
			
		||||
        self._id = _id
 | 
			
		||||
        self.start_date = start_date
 | 
			
		||||
        self.expiry_date = expiry_date
 | 
			
		||||
        self.time_limit = None if time_limit == 'none' or time_limit == '' else time_limit
 | 
			
		||||
        self.creator = creator
 | 
			
		||||
    
 | 
			
		||||
    def create(self):
 | 
			
		||||
        test = {
 | 
			
		||||
            '_id': self._id,
 | 
			
		||||
            'date_created': datetime.today(),
 | 
			
		||||
            'start_date': self.start_date,
 | 
			
		||||
            'expiry_date': self.expiry_date,
 | 
			
		||||
            'time_limit': self.time_limit,
 | 
			
		||||
            'creator': encrypt(self.creator),
 | 
			
		||||
            'test_code': secrets.token_hex(6).upper()
 | 
			
		||||
        }
 | 
			
		||||
        if db.tests.insert_one(test):
 | 
			
		||||
            flash(f'Created a new exam with Exam Code <strong>{self.render_test_code(test["test_code"])}</strong>.', 'success')
 | 
			
		||||
            return jsonify({'success': test}), 200
 | 
			
		||||
        return jsonify({'error': f'Could not create exam. An error occurred.'}), 400
 | 
			
		||||
 | 
			
		||||
    def add_time_adjustment(self, time_adjustment):
 | 
			
		||||
        code = {
 | 
			
		||||
            '_id': uuid4().hex,
 | 
			
		||||
            'user_code': secrets.token_hex(2).upper(),
 | 
			
		||||
            'time_adjustment': time_adjustment
 | 
			
		||||
        }
 | 
			
		||||
        if db.tests.find_one_and_update({'_id': self._id}, {'$push': {'time_adjustments': code}},upsert=False):
 | 
			
		||||
            return jsonify({'success': code})
 | 
			
		||||
        return jsonify({'error': 'Failed to add the time adjustment. An error occurred.'}), 400
 | 
			
		||||
 | 
			
		||||
    def remove_time_adjustment(self, _id):
 | 
			
		||||
        if db.tests.find_one_and_update({'_id': self._id}, {'$pull': {'time_adjustments': {'_id': _id} }}):
 | 
			
		||||
            message = 'Time adjustment has been deleted.'
 | 
			
		||||
            flash(message, 'success')
 | 
			
		||||
            return jsonify({'success': message})
 | 
			
		||||
        return jsonify({'error': 'Failed to delete the time adjustment. An error occurred.'}), 400
 | 
			
		||||
 | 
			
		||||
    def render_test_code(self, test_code):
 | 
			
		||||
        return '—'.join([test_code[:4], test_code[4:8], test_code[8:]])
 | 
			
		||||
    
 | 
			
		||||
    def parse_test_code(self, test_code):
 | 
			
		||||
        return test_code.replace('—', '')
 | 
			
		||||
    
 | 
			
		||||
    def delete(self):
 | 
			
		||||
        if db.tests.delete_one({'_id': self._id}):
 | 
			
		||||
            message = 'Deleted exam.'
 | 
			
		||||
            flash(message, 'alert')
 | 
			
		||||
            return jsonify({'success': message}), 200
 | 
			
		||||
        return jsonify({'error': f'Could not create exam. An error occurred.'}), 400
 | 
			
		||||
 | 
			
		||||
    def update(self):
 | 
			
		||||
        test = {}
 | 
			
		||||
        updated = []
 | 
			
		||||
        if not self.start_date == '' and self.start_date is not None:
 | 
			
		||||
            test['start_date'] = self.start_date
 | 
			
		||||
            updated.append('start date')
 | 
			
		||||
        if not self.expiry_date == '' and self.expiry_date is not None:
 | 
			
		||||
            test['expiry_date'] = self.expiry_date
 | 
			
		||||
            updated.append('expiry date')
 | 
			
		||||
        if not self.time_limit == '' and self.time_limit is not None:
 | 
			
		||||
            test['time_limit'] = self.time_limit
 | 
			
		||||
            updated.append('time limit')
 | 
			
		||||
        output = ''
 | 
			
		||||
        if len(updated) == 0:
 | 
			
		||||
            flash(f'There were no changes requested for your account.', 'alert'), 200
 | 
			
		||||
            return jsonify({'success': 'There were no changes requested for your account.'}), 200
 | 
			
		||||
        elif len(updated) == 1:
 | 
			
		||||
            output = updated[0]
 | 
			
		||||
        elif len(updated) == 2:
 | 
			
		||||
            output = ' and '.join(updated)
 | 
			
		||||
        elif len(updated) > 2:
 | 
			
		||||
            output = updated[0]
 | 
			
		||||
            for index in range(1,len(updated)):
 | 
			
		||||
                if index < len(updated) - 2:
 | 
			
		||||
                    output = ', '.join([output, updated[index]])
 | 
			
		||||
                elif index == len(updated) - 2:
 | 
			
		||||
                    output = ', and '.join([output, updated[index]])
 | 
			
		||||
                else:
 | 
			
		||||
                    output = ''.join([output, updated[index]])
 | 
			
		||||
        db.tests.find_one_and_update({'_id': self._id}, {'$set': test})
 | 
			
		||||
        _output = f'The {output} of the test {"has" if len(updated) == 1 else "have"} been updated.'
 | 
			
		||||
        flash(_output)
 | 
			
		||||
        return jsonify({'success': _output}), 200
 | 
			
		||||
							
								
								
									
										185
									
								
								ref-test/admin/models/users.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										185
									
								
								ref-test/admin/models/users.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,185 @@
 | 
			
		||||
from flask import flash, make_response, Response
 | 
			
		||||
from flask.helpers import url_for
 | 
			
		||||
from flask.json import jsonify
 | 
			
		||||
from werkzeug.security import generate_password_hash, check_password_hash
 | 
			
		||||
from werkzeug.utils import redirect
 | 
			
		||||
from flask_mail import Message
 | 
			
		||||
import secrets
 | 
			
		||||
 | 
			
		||||
from security import encrypt, decrypt
 | 
			
		||||
from security.database import decrypt_find_one, encrypted_update
 | 
			
		||||
from datetime import datetime, timedelta
 | 
			
		||||
from main import db, mail
 | 
			
		||||
 | 
			
		||||
class User:
 | 
			
		||||
 | 
			
		||||
    def __init__(self, _id=None, username=None, password=None, email=None, remember=False):
 | 
			
		||||
        self._id = _id
 | 
			
		||||
        self.username = username
 | 
			
		||||
        self.email = email
 | 
			
		||||
        self.password = password
 | 
			
		||||
        self.remember = remember
 | 
			
		||||
 | 
			
		||||
    def start_session(self, resp:Response):
 | 
			
		||||
        resp.set_cookie(
 | 
			
		||||
            key = '_id',
 | 
			
		||||
            value = self._id,
 | 
			
		||||
            max_age = timedelta(days=14) if self.remember else 'Session',
 | 
			
		||||
            path = '/',
 | 
			
		||||
            expires = datetime.utcnow() + timedelta(days=14) if self.remember else 'Session'
 | 
			
		||||
        )
 | 
			
		||||
        if self.remember:
 | 
			
		||||
            resp.set_cookie (
 | 
			
		||||
                key = 'remember',
 | 
			
		||||
                value = 'True',
 | 
			
		||||
                max_age = timedelta(days=14),
 | 
			
		||||
                path = '/',
 | 
			
		||||
                expires = datetime.utcnow() + timedelta(days=14)
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def register(self):
 | 
			
		||||
        from ..views import get_id_from_cookie
 | 
			
		||||
        user = {
 | 
			
		||||
            '_id': self._id,
 | 
			
		||||
            'email': encrypt(self.email),
 | 
			
		||||
            'password': generate_password_hash(self.password, method='sha256'),
 | 
			
		||||
            'username': encrypt(self.username)
 | 
			
		||||
        }
 | 
			
		||||
        if decrypt_find_one(db.users, { 'username': self.username }):
 | 
			
		||||
            return jsonify({ 'error': f'Username {self.username} is not available.' }), 400
 | 
			
		||||
        if db.users.insert_one(user):
 | 
			
		||||
            flash(f'User {self.username} has been created successfully. You may now use it to log in and administer the tests.', 'success')
 | 
			
		||||
            resp = make_response(jsonify(user), 200)
 | 
			
		||||
            if not get_id_from_cookie:
 | 
			
		||||
                self.start_session(resp)
 | 
			
		||||
            return resp
 | 
			
		||||
        return jsonify({ 'error': f'Registration failed. An error occurred.' }), 400
 | 
			
		||||
    
 | 
			
		||||
    def login(self):
 | 
			
		||||
        user = decrypt_find_one( db.users, { 'username': self.username })
 | 
			
		||||
        if not user:
 | 
			
		||||
            return jsonify({ 'error': f'Username {self.username} does not exist.' }), 401
 | 
			
		||||
        if not check_password_hash( user['password'], self.password ):
 | 
			
		||||
            return jsonify({ 'error': f'The password you entered is incorrect.' }), 401
 | 
			
		||||
        resp = make_response(jsonify({ 'success': f'Successfully logged in user {self.username}.' }), 200)
 | 
			
		||||
        self._id = user['_id']
 | 
			
		||||
        self.start_session(resp)
 | 
			
		||||
        return resp
 | 
			
		||||
 | 
			
		||||
    def logout(self):
 | 
			
		||||
        resp = make_response(redirect(url_for('admin_auth.login')))
 | 
			
		||||
        resp.set_cookie(
 | 
			
		||||
            key = '_id',
 | 
			
		||||
            value = '',
 | 
			
		||||
            max_age = timedelta(days=-1),
 | 
			
		||||
            path = '/',
 | 
			
		||||
            expires= datetime.utcnow() + timedelta(days=-1)
 | 
			
		||||
        )
 | 
			
		||||
        resp.set_cookie (
 | 
			
		||||
            key = 'cookie_consent',
 | 
			
		||||
            value = 'True',
 | 
			
		||||
            max_age = 'Session',
 | 
			
		||||
            path = '/',
 | 
			
		||||
            expires = 'Session'
 | 
			
		||||
        )
 | 
			
		||||
        resp.set_cookie (
 | 
			
		||||
            key = 'remember',
 | 
			
		||||
            value = 'True',
 | 
			
		||||
            max_age = timedelta(days=-1),
 | 
			
		||||
            path = '/',
 | 
			
		||||
            expires = datetime.utcnow() + timedelta(days=-1)
 | 
			
		||||
        )
 | 
			
		||||
        flash('You have been logged out. All cookies pertaining to your account have been deleted. Have a nice day.', 'alert')
 | 
			
		||||
        return resp
 | 
			
		||||
 | 
			
		||||
    def reset_password(self):
 | 
			
		||||
        user = decrypt_find_one(db.users, { 'username': self.username })
 | 
			
		||||
        if not user:
 | 
			
		||||
            return jsonify({ 'error': f'Username {self.username} does not exist.' }), 401
 | 
			
		||||
        if not decrypt(user['email']) == self.email:
 | 
			
		||||
            return jsonify({ 'error': f'The email address {self.email} does not match the user account {self.username}.' }), 401
 | 
			
		||||
        new_password = secrets.token_hex(12)
 | 
			
		||||
        reset_token = secrets.token_urlsafe(16)
 | 
			
		||||
        verification_token = secrets.token_urlsafe(16)
 | 
			
		||||
        user['password'] = generate_password_hash(new_password, method='sha256')
 | 
			
		||||
        if encrypted_update( { 'username': self.username }, { '$set': {'password': user['password'], 'reset_token': reset_token, 'verification_token': verification_token } } ):
 | 
			
		||||
            flash(f'Your password has been reset. Instructions to recover your account have been sent to {self.email}. Please be sure to check your spam folder in case you have not received the email.', 'alert')
 | 
			
		||||
            email = Message(
 | 
			
		||||
                subject = 'RefTest | Password Reset',
 | 
			
		||||
                recipients = [self.email],
 | 
			
		||||
                body = f"""
 | 
			
		||||
                Hello {user['username']}, \n\n
 | 
			
		||||
                This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app.\n\n
 | 
			
		||||
                If you did not make this request, please ignore this email.\n\n
 | 
			
		||||
                If you did make this request, then you have two options to recover your account.\n\n
 | 
			
		||||
                For the time being, your password has been reset to the following:\n\n
 | 
			
		||||
                {new_password}\n\n
 | 
			
		||||
                You may use this to log back in to your account, and subsequently change your password to something more suitable.\n\n
 | 
			
		||||
                Alternatively, you may visit the following private link using your unique token to override your password. Copy and paste the following link in a web browser. Please note that this token is only valid once:\n\n
 | 
			
		||||
                {url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}\n\n
 | 
			
		||||
                Have a nice day.
 | 
			
		||||
                """,
 | 
			
		||||
                html = f"""
 | 
			
		||||
                <p>Hello {user['username']},</p>
 | 
			
		||||
                <p>This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app. </p>
 | 
			
		||||
                <p>If you did not make this request, please ignore this email.</p>
 | 
			
		||||
                <p>If you did make this request, then you have two options to recover your account.</p>
 | 
			
		||||
                <p>For the time being, your password has been reset to the following:</p>
 | 
			
		||||
                <strong>{new_password}</strong>
 | 
			
		||||
                <p>You may use this to log back in to your account, and subsequently change your password to something more suitable.</p>
 | 
			
		||||
                <p>Alternatively, you may visit the following private link using your unique token to override your password. Click on the following link, or copy and paste it into a browser. <strong>Please note that this token is only valid once</strong>:</p>
 | 
			
		||||
                <p><a href='{url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}'>{url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}</a></p>
 | 
			
		||||
                <p>Have a nice day.</p>
 | 
			
		||||
                """
 | 
			
		||||
            )
 | 
			
		||||
            mail.send(email)
 | 
			
		||||
            return jsonify({ 'success': 'Password reset request has been processed.'}), 200
 | 
			
		||||
    
 | 
			
		||||
    def update(self):
 | 
			
		||||
        from ..views import get_id_from_cookie
 | 
			
		||||
        retrieved_user = decrypt_find_one(db.users, { '_id': self._id })
 | 
			
		||||
        if not retrieved_user:
 | 
			
		||||
            return jsonify({ 'error': f'User {retrieved_user["username"]} does not exist.' }), 401
 | 
			
		||||
        user = {}
 | 
			
		||||
        updated = []
 | 
			
		||||
        if not self.email == '' and self.email is not None:
 | 
			
		||||
            user['email'] = self.email
 | 
			
		||||
            updated.append('email')
 | 
			
		||||
        if not self.password == '' and self.password is not None:
 | 
			
		||||
            user['password'] = generate_password_hash(self.password, method='sha256')
 | 
			
		||||
            updated.append('password')
 | 
			
		||||
        output = ''
 | 
			
		||||
        if len(updated) == 0:
 | 
			
		||||
            flash(f'There were no changes requested for your account.', 'alert'), 200
 | 
			
		||||
            return jsonify({'success': 'There were no changes requested for your account.'}), 200
 | 
			
		||||
        elif len(updated) == 1:
 | 
			
		||||
            output = updated[0]
 | 
			
		||||
        elif len(updated) == 2:
 | 
			
		||||
            output = ' and '.join(updated)
 | 
			
		||||
        elif len(updated) > 2:
 | 
			
		||||
            output = updated[0]
 | 
			
		||||
            for index in range(1,len(updated)):
 | 
			
		||||
                if index < len(updated) - 2:
 | 
			
		||||
                    output = ', '.join([output, updated[index]])
 | 
			
		||||
                elif index == len(updated) - 2:
 | 
			
		||||
                    output = ', and '.join([output, updated[index]])
 | 
			
		||||
                else:
 | 
			
		||||
                    output = ''.join([output, updated[index]])
 | 
			
		||||
        encrypted_update(db.users, {'_id': self._id}, { '$set': user })
 | 
			
		||||
        if self._id == get_id_from_cookie():
 | 
			
		||||
            _output = 'Your '
 | 
			
		||||
        elif retrieved_user['username'][-1] == 's':
 | 
			
		||||
            _output = '’'.join([retrieved_user['username'], ''])
 | 
			
		||||
        else:
 | 
			
		||||
            _output = '’'.join([retrieved_user['username'], 's'])
 | 
			
		||||
        _output = f'{_output} {output} {"has" if len(updated) == 1 else "have"} been updated.'
 | 
			
		||||
        flash(_output)
 | 
			
		||||
        return jsonify({'success': _output}), 200
 | 
			
		||||
    
 | 
			
		||||
    def delete(self):
 | 
			
		||||
        retrieved_user = decrypt_find_one(db.users, { '_id': self._id })
 | 
			
		||||
        if not retrieved_user:
 | 
			
		||||
            return jsonify({ 'error': f'User does not exist.' }), 401
 | 
			
		||||
        db.users.find_one_and_delete({'_id': self._id})
 | 
			
		||||
        flash(f'User {retrieved_user["username"]} has been deleted.')
 | 
			
		||||
        return jsonify({'success': f'User {retrieved_user["username"]} has been deleted.'}), 200
 | 
			
		||||
		Reference in New Issue
	
	Block a user