Refactor to have all models in the models package.

This commit is contained in:
2021-11-25 23:21:48 +00:00
parent 52019e61c1
commit 0fff71b1fb
6 changed files with 13 additions and 13 deletions

View File

View File

@ -0,0 +1,56 @@
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, DateField, SelectField
from wtforms.validators import InputRequired, Email, Length, EqualTo, Optional
from datetime import date, timedelta
class LoginForm(FlaskForm):
username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
remember = BooleanField('Remember Log In', render_kw={'checked': True})
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
email = StringField('Email Address', validators=[InputRequired(), Email(message='You must enter a valid email address.'), Length(max=50)])
password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.'), EqualTo('password', message='Passwords do not match.')])
class ResetPasswordForm(FlaskForm):
username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
email = StringField('Email Address', validators=[InputRequired(), Email(message='You must enter a valid email address.'), Length(max=50)])
class UpdatePasswordForm(FlaskForm):
password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.'), EqualTo('password', message='Passwords do not match.')])
class CreateUserForm(FlaskForm):
username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
email = StringField('Email Address', validators=[InputRequired(), Email(message='You must enter a valid email address.'), Length(max=50)])
password = PasswordField('Password (Optional)', validators=[Optional(),Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
class DeleteUserForm(FlaskForm):
password = PasswordField('Confirm Your Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
notify = BooleanField('Notify deletion by email', render_kw={'checked': True})
class UpdateUserForm(FlaskForm):
user_password = PasswordField('Confirm Your Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
email = StringField('Email Address', validators=[Optional(), Email(message='You must enter a valid email address.'), Length(max=50)])
password = PasswordField('Change Password', validators=[Optional(),Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter New Password', validators=[EqualTo('password', message='Passwords do not match.')])
notify = BooleanField('Notify changes by email', render_kw={'checked': True})
class UpdateAccountForm(FlaskForm):
password_confirm = PasswordField('Current Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
email = StringField('Email Address', validators=[Optional(), Email(message='You must enter a valid email address.'), Length(max=50)])
password = PasswordField('Change Password', validators=[Optional(),Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter New Password', validators=[EqualTo('password', message='Passwords do not match.')])
class CreateTest(FlaskForm):
start_date = DateField('Start Date', format="%Y-%m-%d", validators=[InputRequired()], default = date.today() )
time_options = [
('none', 'None'),
('60', '1 hour'),
('90', '1 hour 30 minutes'),
('120', '2 hours')
]
expiry_date = DateField('Expiry Date', format="%Y-%m-%d", validators=[InputRequired()], default = date.today() + timedelta(days=1) )
time_limit = SelectField('Time Limit', choices=time_options)

View File

@ -0,0 +1,95 @@
import secrets
from datetime import datetime
from uuid import uuid4
from flask import flash, jsonify
import secrets
from main import db
from security import encrypt
class Test:
def __init__(self, _id=None, start_date=None, expiry_date=None, time_limit=None, creator=None):
self._id = _id
self.start_date = start_date
self.expiry_date = expiry_date
self.time_limit = None if time_limit == 'none' or time_limit == '' else time_limit
self.creator = creator
def create(self):
test = {
'_id': self._id,
'date_created': datetime.today(),
'start_date': self.start_date,
'expiry_date': self.expiry_date,
'time_limit': self.time_limit,
'creator': encrypt(self.creator),
'test_code': secrets.token_hex(6).upper()
}
if db.tests.insert_one(test):
flash(f'Created a new exam with Exam Code <strong>{self.render_test_code(test["test_code"])}</strong>.', 'success')
return jsonify({'success': test}), 200
return jsonify({'error': f'Could not create exam. An error occurred.'}), 400
def add_time_adjustment(self, time_adjustment):
code = {
'_id': uuid4().hex,
'user_code': secrets.token_hex(2).upper(),
'time_adjustment': time_adjustment
}
if db.tests.find_one_and_update({'_id': self._id}, {'$push': {'time_adjustments': code}},upsert=False):
return jsonify({'success': code})
return jsonify({'error': 'Failed to add the time adjustment. An error occurred.'}), 400
def remove_time_adjustment(self, _id):
if db.tests.find_one_and_update({'_id': self._id}, {'$pull': {'time_adjustments': {'_id': _id} }}):
message = 'Time adjustment has been deleted.'
flash(message, 'success')
return jsonify({'success': message})
return jsonify({'error': 'Failed to delete the time adjustment. An error occurred.'}), 400
def render_test_code(self, test_code):
return ''.join([test_code[:4], test_code[4:8], test_code[8:]])
def parse_test_code(self, test_code):
return test_code.replace('', '')
def delete(self):
if db.tests.delete_one({'_id': self._id}):
message = 'Deleted exam.'
flash(message, 'alert')
return jsonify({'success': message}), 200
return jsonify({'error': f'Could not create exam. An error occurred.'}), 400
def update(self):
test = {}
updated = []
if not self.start_date == '' and self.start_date is not None:
test['start_date'] = self.start_date
updated.append('start date')
if not self.expiry_date == '' and self.expiry_date is not None:
test['expiry_date'] = self.expiry_date
updated.append('expiry date')
if not self.time_limit == '' and self.time_limit is not None:
test['time_limit'] = self.time_limit
updated.append('time limit')
output = ''
if len(updated) == 0:
flash(f'There were no changes requested for your account.', 'alert'), 200
return jsonify({'success': 'There were no changes requested for your account.'}), 200
elif len(updated) == 1:
output = updated[0]
elif len(updated) == 2:
output = ' and '.join(updated)
elif len(updated) > 2:
output = updated[0]
for index in range(1,len(updated)):
if index < len(updated) - 2:
output = ', '.join([output, updated[index]])
elif index == len(updated) - 2:
output = ', and '.join([output, updated[index]])
else:
output = ''.join([output, updated[index]])
db.tests.find_one_and_update({'_id': self._id}, {'$set': test})
_output = f'The {output} of the test {"has" if len(updated) == 1 else "have"} been updated.'
flash(_output)
return jsonify({'success': _output}), 200

View File

@ -0,0 +1,185 @@
from flask import flash, make_response, Response
from flask.helpers import url_for
from flask.json import jsonify
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import redirect
from flask_mail import Message
import secrets
from security import encrypt, decrypt
from security.database import decrypt_find_one, encrypted_update
from datetime import datetime, timedelta
from main import db, mail
class User:
def __init__(self, _id=None, username=None, password=None, email=None, remember=False):
self._id = _id
self.username = username
self.email = email
self.password = password
self.remember = remember
def start_session(self, resp:Response):
resp.set_cookie(
key = '_id',
value = self._id,
max_age = timedelta(days=14) if self.remember else 'Session',
path = '/',
expires = datetime.utcnow() + timedelta(days=14) if self.remember else 'Session'
)
if self.remember:
resp.set_cookie (
key = 'remember',
value = 'True',
max_age = timedelta(days=14),
path = '/',
expires = datetime.utcnow() + timedelta(days=14)
)
def register(self):
from ..views import get_id_from_cookie
user = {
'_id': self._id,
'email': encrypt(self.email),
'password': generate_password_hash(self.password, method='sha256'),
'username': encrypt(self.username)
}
if decrypt_find_one(db.users, { 'username': self.username }):
return jsonify({ 'error': f'Username {self.username} is not available.' }), 400
if db.users.insert_one(user):
flash(f'User {self.username} has been created successfully. You may now use it to log in and administer the tests.', 'success')
resp = make_response(jsonify(user), 200)
if not get_id_from_cookie:
self.start_session(resp)
return resp
return jsonify({ 'error': f'Registration failed. An error occurred.' }), 400
def login(self):
user = decrypt_find_one( db.users, { 'username': self.username })
if not user:
return jsonify({ 'error': f'Username {self.username} does not exist.' }), 401
if not check_password_hash( user['password'], self.password ):
return jsonify({ 'error': f'The password you entered is incorrect.' }), 401
resp = make_response(jsonify({ 'success': f'Successfully logged in user {self.username}.' }), 200)
self._id = user['_id']
self.start_session(resp)
return resp
def logout(self):
resp = make_response(redirect(url_for('admin_auth.login')))
resp.set_cookie(
key = '_id',
value = '',
max_age = timedelta(days=-1),
path = '/',
expires= datetime.utcnow() + timedelta(days=-1)
)
resp.set_cookie (
key = 'cookie_consent',
value = 'True',
max_age = 'Session',
path = '/',
expires = 'Session'
)
resp.set_cookie (
key = 'remember',
value = 'True',
max_age = timedelta(days=-1),
path = '/',
expires = datetime.utcnow() + timedelta(days=-1)
)
flash('You have been logged out. All cookies pertaining to your account have been deleted. Have a nice day.', 'alert')
return resp
def reset_password(self):
user = decrypt_find_one(db.users, { 'username': self.username })
if not user:
return jsonify({ 'error': f'Username {self.username} does not exist.' }), 401
if not decrypt(user['email']) == self.email:
return jsonify({ 'error': f'The email address {self.email} does not match the user account {self.username}.' }), 401
new_password = secrets.token_hex(12)
reset_token = secrets.token_urlsafe(16)
verification_token = secrets.token_urlsafe(16)
user['password'] = generate_password_hash(new_password, method='sha256')
if encrypted_update( { 'username': self.username }, { '$set': {'password': user['password'], 'reset_token': reset_token, 'verification_token': verification_token } } ):
flash(f'Your password has been reset. Instructions to recover your account have been sent to {self.email}. Please be sure to check your spam folder in case you have not received the email.', 'alert')
email = Message(
subject = 'RefTest | Password Reset',
recipients = [self.email],
body = f"""
Hello {user['username']}, \n\n
This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app.\n\n
If you did not make this request, please ignore this email.\n\n
If you did make this request, then you have two options to recover your account.\n\n
For the time being, your password has been reset to the following:\n\n
{new_password}\n\n
You may use this to log back in to your account, and subsequently change your password to something more suitable.\n\n
Alternatively, you may visit the following private link using your unique token to override your password. Copy and paste the following link in a web browser. Please note that this token is only valid once:\n\n
{url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}\n\n
Have a nice day.
""",
html = f"""
<p>Hello {user['username']},</p>
<p>This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app. </p>
<p>If you did not make this request, please ignore this email.</p>
<p>If you did make this request, then you have two options to recover your account.</p>
<p>For the time being, your password has been reset to the following:</p>
<strong>{new_password}</strong>
<p>You may use this to log back in to your account, and subsequently change your password to something more suitable.</p>
<p>Alternatively, you may visit the following private link using your unique token to override your password. Click on the following link, or copy and paste it into a browser. <strong>Please note that this token is only valid once</strong>:</p>
<p><a href='{url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}'>{url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}</a></p>
<p>Have a nice day.</p>
"""
)
mail.send(email)
return jsonify({ 'success': 'Password reset request has been processed.'}), 200
def update(self):
from ..views import get_id_from_cookie
retrieved_user = decrypt_find_one(db.users, { '_id': self._id })
if not retrieved_user:
return jsonify({ 'error': f'User {retrieved_user["username"]} does not exist.' }), 401
user = {}
updated = []
if not self.email == '' and self.email is not None:
user['email'] = self.email
updated.append('email')
if not self.password == '' and self.password is not None:
user['password'] = generate_password_hash(self.password, method='sha256')
updated.append('password')
output = ''
if len(updated) == 0:
flash(f'There were no changes requested for your account.', 'alert'), 200
return jsonify({'success': 'There were no changes requested for your account.'}), 200
elif len(updated) == 1:
output = updated[0]
elif len(updated) == 2:
output = ' and '.join(updated)
elif len(updated) > 2:
output = updated[0]
for index in range(1,len(updated)):
if index < len(updated) - 2:
output = ', '.join([output, updated[index]])
elif index == len(updated) - 2:
output = ', and '.join([output, updated[index]])
else:
output = ''.join([output, updated[index]])
encrypted_update(db.users, {'_id': self._id}, { '$set': user })
if self._id == get_id_from_cookie():
_output = 'Your '
elif retrieved_user['username'][-1] == 's':
_output = '&rsquo;'.join([retrieved_user['username'], ''])
else:
_output = '&rsquo;'.join([retrieved_user['username'], 's'])
_output = f'{_output} {output} {"has" if len(updated) == 1 else "have"} been updated.'
flash(_output)
return jsonify({'success': _output}), 200
def delete(self):
retrieved_user = decrypt_find_one(db.users, { '_id': self._id })
if not retrieved_user:
return jsonify({ 'error': f'User does not exist.' }), 401
db.users.find_one_and_delete({'_id': self._id})
flash(f'User {retrieved_user["username"]} has been deleted.')
return jsonify({'success': f'User {retrieved_user["username"]} has been deleted.'}), 200