Refactor to have all models in the models package.
This commit is contained in:
		| @@ -1,7 +1,7 @@ | |||||||
| from flask import Blueprint, render_template, request, session, redirect | from flask import Blueprint, render_template, request, session, redirect | ||||||
| from flask.helpers import flash, url_for | from flask.helpers import flash, url_for | ||||||
| from flask.json import jsonify | from flask.json import jsonify | ||||||
| from .user.models import User | from .models.users import User | ||||||
| from uuid import uuid4 | from uuid import uuid4 | ||||||
| from security.database import decrypt_find_one, encrypted_update | from security.database import decrypt_find_one, encrypted_update | ||||||
| from werkzeug.security import check_password_hash | from werkzeug.security import check_password_hash | ||||||
| @@ -21,7 +21,7 @@ auth = Blueprint( | |||||||
| @admin_account_required | @admin_account_required | ||||||
| @login_required | @login_required | ||||||
| def account(): | def account(): | ||||||
|     from .forms import UpdateAccountForm |     from .models.forms import UpdateAccountForm | ||||||
|     form = UpdateAccountForm() |     form = UpdateAccountForm() | ||||||
|     _id = get_id_from_cookie() |     _id = get_id_from_cookie() | ||||||
|     user = decrypt_find_one(db.users, {'_id': _id}) |     user = decrypt_find_one(db.users, {'_id': _id}) | ||||||
| @@ -46,7 +46,7 @@ def account(): | |||||||
| @admin_account_required | @admin_account_required | ||||||
| @disable_if_logged_in | @disable_if_logged_in | ||||||
| def login(): | def login(): | ||||||
|     from .forms import LoginForm |     from .models.forms import LoginForm | ||||||
|     form = LoginForm() |     form = LoginForm() | ||||||
|     if request.method == 'GET': |     if request.method == 'GET': | ||||||
|         return render_template('/admin/auth/login.html', form=form) |         return render_template('/admin/auth/login.html', form=form) | ||||||
| @@ -72,7 +72,7 @@ def logout(): | |||||||
| @auth.route('/register/', methods=['GET','POST']) | @auth.route('/register/', methods=['GET','POST']) | ||||||
| @disable_on_registration | @disable_on_registration | ||||||
| def register(): | def register(): | ||||||
|     from .forms import RegistrationForm |     from .models.forms import RegistrationForm | ||||||
|     form = RegistrationForm() |     form = RegistrationForm() | ||||||
|     if request.method == 'GET': |     if request.method == 'GET': | ||||||
|         return render_template('/admin/auth/register.html', form=form) |         return render_template('/admin/auth/register.html', form=form) | ||||||
| @@ -93,7 +93,7 @@ def register(): | |||||||
| @admin_account_required | @admin_account_required | ||||||
| @disable_if_logged_in | @disable_if_logged_in | ||||||
| def reset(): | def reset(): | ||||||
|     from .forms import ResetPasswordForm |     from .models.forms import ResetPasswordForm | ||||||
|     form = ResetPasswordForm() |     form = ResetPasswordForm() | ||||||
|     if request.method == 'GET': |     if request.method == 'GET': | ||||||
|         return render_template('/admin/auth/reset.html', form=form) |         return render_template('/admin/auth/reset.html', form=form) | ||||||
| @@ -128,7 +128,7 @@ def reset_gateway(token1,token2): | |||||||
| @admin_account_required | @admin_account_required | ||||||
| @disable_if_logged_in | @disable_if_logged_in | ||||||
| def update_password_(): | def update_password_(): | ||||||
|     from .forms import UpdatePasswordForm |     from .models.forms import UpdatePasswordForm | ||||||
|     form = UpdatePasswordForm() |     form = UpdatePasswordForm() | ||||||
|     if request.method == 'GET': |     if request.method == 'GET': | ||||||
|         if 'reset_validated' not in session: |         if 'reset_validated' not in session: | ||||||
|   | |||||||
| @@ -1,185 +0,0 @@ | |||||||
| from flask import flash, make_response, Response |  | ||||||
| from flask.helpers import url_for |  | ||||||
| from flask.json import jsonify |  | ||||||
| from werkzeug.security import generate_password_hash, check_password_hash |  | ||||||
| from werkzeug.utils import redirect |  | ||||||
| from flask_mail import Message |  | ||||||
| import secrets |  | ||||||
|  |  | ||||||
| from security import encrypt, decrypt |  | ||||||
| from security.database import decrypt_find_one, encrypted_update |  | ||||||
| from datetime import datetime, timedelta |  | ||||||
| from main import db, mail |  | ||||||
|  |  | ||||||
| class User: |  | ||||||
|  |  | ||||||
|     def __init__(self, _id=None, username=None, password=None, email=None, remember=False): |  | ||||||
|         self._id = _id |  | ||||||
|         self.username = username |  | ||||||
|         self.email = email |  | ||||||
|         self.password = password |  | ||||||
|         self.remember = remember |  | ||||||
|  |  | ||||||
|     def start_session(self, resp:Response): |  | ||||||
|         resp.set_cookie( |  | ||||||
|             key = '_id', |  | ||||||
|             value = self._id, |  | ||||||
|             max_age = timedelta(days=14) if self.remember else 'Session', |  | ||||||
|             path = '/', |  | ||||||
|             expires = datetime.utcnow() + timedelta(days=14) if self.remember else 'Session' |  | ||||||
|         ) |  | ||||||
|         if self.remember: |  | ||||||
|             resp.set_cookie ( |  | ||||||
|                 key = 'remember', |  | ||||||
|                 value = 'True', |  | ||||||
|                 max_age = timedelta(days=14), |  | ||||||
|                 path = '/', |  | ||||||
|                 expires = datetime.utcnow() + timedelta(days=14) |  | ||||||
|             ) |  | ||||||
|  |  | ||||||
|     def register(self): |  | ||||||
|         from ..views import get_id_from_cookie |  | ||||||
|         user = { |  | ||||||
|             '_id': self._id, |  | ||||||
|             'email': encrypt(self.email), |  | ||||||
|             'password': generate_password_hash(self.password, method='sha256'), |  | ||||||
|             'username': encrypt(self.username) |  | ||||||
|         } |  | ||||||
|         if decrypt_find_one(db.users, { 'username': self.username }): |  | ||||||
|             return jsonify({ 'error': f'Username {self.username} is not available.' }), 400 |  | ||||||
|         if db.users.insert_one(user): |  | ||||||
|             flash(f'User {self.username} has been created successfully. You may now use it to log in and administer the tests.', 'success') |  | ||||||
|             resp = make_response(jsonify(user), 200) |  | ||||||
|             if not get_id_from_cookie: |  | ||||||
|                 self.start_session(resp) |  | ||||||
|             return resp |  | ||||||
|         return jsonify({ 'error': f'Registration failed. An error occurred.' }), 400 |  | ||||||
|      |  | ||||||
|     def login(self): |  | ||||||
|         user = decrypt_find_one( db.users, { 'username': self.username }) |  | ||||||
|         if not user: |  | ||||||
|             return jsonify({ 'error': f'Username {self.username} does not exist.' }), 401 |  | ||||||
|         if not check_password_hash( user['password'], self.password ): |  | ||||||
|             return jsonify({ 'error': f'The password you entered is incorrect.' }), 401 |  | ||||||
|         resp = make_response(jsonify({ 'success': f'Successfully logged in user {self.username}.' }), 200) |  | ||||||
|         self._id = user['_id'] |  | ||||||
|         self.start_session(resp) |  | ||||||
|         return resp |  | ||||||
|  |  | ||||||
|     def logout(self): |  | ||||||
|         resp = make_response(redirect(url_for('admin_auth.login'))) |  | ||||||
|         resp.set_cookie( |  | ||||||
|             key = '_id', |  | ||||||
|             value = '', |  | ||||||
|             max_age = timedelta(days=-1), |  | ||||||
|             path = '/', |  | ||||||
|             expires= datetime.utcnow() + timedelta(days=-1) |  | ||||||
|         ) |  | ||||||
|         resp.set_cookie ( |  | ||||||
|             key = 'cookie_consent', |  | ||||||
|             value = 'True', |  | ||||||
|             max_age = 'Session', |  | ||||||
|             path = '/', |  | ||||||
|             expires = 'Session' |  | ||||||
|         ) |  | ||||||
|         resp.set_cookie ( |  | ||||||
|             key = 'remember', |  | ||||||
|             value = 'True', |  | ||||||
|             max_age = timedelta(days=-1), |  | ||||||
|             path = '/', |  | ||||||
|             expires = datetime.utcnow() + timedelta(days=-1) |  | ||||||
|         ) |  | ||||||
|         flash('You have been logged out. All cookies pertaining to your account have been deleted. Have a nice day.', 'alert') |  | ||||||
|         return resp |  | ||||||
|  |  | ||||||
|     def reset_password(self): |  | ||||||
|         user = decrypt_find_one(db.users, { 'username': self.username }) |  | ||||||
|         if not user: |  | ||||||
|             return jsonify({ 'error': f'Username {self.username} does not exist.' }), 401 |  | ||||||
|         if not decrypt(user['email']) == self.email: |  | ||||||
|             return jsonify({ 'error': f'The email address {self.email} does not match the user account {self.username}.' }), 401 |  | ||||||
|         new_password = secrets.token_hex(12) |  | ||||||
|         reset_token = secrets.token_urlsafe(16) |  | ||||||
|         verification_token = secrets.token_urlsafe(16) |  | ||||||
|         user['password'] = generate_password_hash(new_password, method='sha256') |  | ||||||
|         if encrypted_update( { 'username': self.username }, { '$set': {'password': user['password'], 'reset_token': reset_token, 'verification_token': verification_token } } ): |  | ||||||
|             flash(f'Your password has been reset. Instructions to recover your account have been sent to {self.email}. Please be sure to check your spam folder in case you have not received the email.', 'alert') |  | ||||||
|             email = Message( |  | ||||||
|                 subject = 'RefTest | Password Reset', |  | ||||||
|                 recipients = [self.email], |  | ||||||
|                 body = f""" |  | ||||||
|                 Hello {user['username']}, \n\n |  | ||||||
|                 This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app.\n\n |  | ||||||
|                 If you did not make this request, please ignore this email.\n\n |  | ||||||
|                 If you did make this request, then you have two options to recover your account.\n\n |  | ||||||
|                 For the time being, your password has been reset to the following:\n\n |  | ||||||
|                 {new_password}\n\n |  | ||||||
|                 You may use this to log back in to your account, and subsequently change your password to something more suitable.\n\n |  | ||||||
|                 Alternatively, you may visit the following private link using your unique token to override your password. Copy and paste the following link in a web browser. Please note that this token is only valid once:\n\n |  | ||||||
|                 {url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}\n\n |  | ||||||
|                 Have a nice day. |  | ||||||
|                 """, |  | ||||||
|                 html = f""" |  | ||||||
|                 <p>Hello {user['username']},</p> |  | ||||||
|                 <p>This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app. </p> |  | ||||||
|                 <p>If you did not make this request, please ignore this email.</p> |  | ||||||
|                 <p>If you did make this request, then you have two options to recover your account.</p> |  | ||||||
|                 <p>For the time being, your password has been reset to the following:</p> |  | ||||||
|                 <strong>{new_password}</strong> |  | ||||||
|                 <p>You may use this to log back in to your account, and subsequently change your password to something more suitable.</p> |  | ||||||
|                 <p>Alternatively, you may visit the following private link using your unique token to override your password. Click on the following link, or copy and paste it into a browser. <strong>Please note that this token is only valid once</strong>:</p> |  | ||||||
|                 <p><a href='{url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}'>{url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}</a></p> |  | ||||||
|                 <p>Have a nice day.</p> |  | ||||||
|                 """ |  | ||||||
|             ) |  | ||||||
|             mail.send(email) |  | ||||||
|             return jsonify({ 'success': 'Password reset request has been processed.'}), 200 |  | ||||||
|      |  | ||||||
|     def update(self): |  | ||||||
|         from ..views import get_id_from_cookie |  | ||||||
|         retrieved_user = decrypt_find_one(db.users, { '_id': self._id }) |  | ||||||
|         if not retrieved_user: |  | ||||||
|             return jsonify({ 'error': f'User {retrieved_user["username"]} does not exist.' }), 401 |  | ||||||
|         user = {} |  | ||||||
|         updated = [] |  | ||||||
|         if not self.email == '' and self.email is not None: |  | ||||||
|             user['email'] = self.email |  | ||||||
|             updated.append('email') |  | ||||||
|         if not self.password == '' and self.password is not None: |  | ||||||
|             user['password'] = generate_password_hash(self.password, method='sha256') |  | ||||||
|             updated.append('password') |  | ||||||
|         output = '' |  | ||||||
|         if len(updated) == 0: |  | ||||||
|             flash(f'There were no changes requested for your account.', 'alert'), 200 |  | ||||||
|             return jsonify({'success': 'There were no changes requested for your account.'}), 200 |  | ||||||
|         elif len(updated) == 1: |  | ||||||
|             output = updated[0] |  | ||||||
|         elif len(updated) == 2: |  | ||||||
|             output = ' and '.join(updated) |  | ||||||
|         elif len(updated) > 2: |  | ||||||
|             output = updated[0] |  | ||||||
|             for index in range(1,len(updated)): |  | ||||||
|                 if index < len(updated) - 2: |  | ||||||
|                     output = ', '.join([output, updated[index]]) |  | ||||||
|                 elif index == len(updated) - 2: |  | ||||||
|                     output = ', and '.join([output, updated[index]]) |  | ||||||
|                 else: |  | ||||||
|                     output = ''.join([output, updated[index]]) |  | ||||||
|         encrypted_update(db.users, {'_id': self._id}, { '$set': user }) |  | ||||||
|         if self._id == get_id_from_cookie(): |  | ||||||
|             _output = 'Your ' |  | ||||||
|         elif retrieved_user['username'][-1] == 's': |  | ||||||
|             _output = '’'.join([retrieved_user['username'], '']) |  | ||||||
|         else: |  | ||||||
|             _output = '’'.join([retrieved_user['username'], 's']) |  | ||||||
|         _output = f'{_output} {output} {"has" if len(updated) == 1 else "have"} been updated.' |  | ||||||
|         flash(_output) |  | ||||||
|         return jsonify({'success': _output}), 200 |  | ||||||
|      |  | ||||||
|     def delete(self): |  | ||||||
|         retrieved_user = decrypt_find_one(db.users, { '_id': self._id }) |  | ||||||
|         if not retrieved_user: |  | ||||||
|             return jsonify({ 'error': f'User does not exist.' }), 401 |  | ||||||
|         db.users.find_one_and_delete({'_id': self._id}) |  | ||||||
|         flash(f'User {retrieved_user["username"]} has been deleted.') |  | ||||||
|         return jsonify({'success': f'User {retrieved_user["username"]} has been deleted.'}), 200 |  | ||||||
| @@ -4,14 +4,14 @@ from functools import wraps | |||||||
|  |  | ||||||
| from werkzeug.security import check_password_hash | from werkzeug.security import check_password_hash | ||||||
| from security.database import decrypt_find, decrypt_find_one | from security.database import decrypt_find, decrypt_find_one | ||||||
| from .user.models import User | from .models.users import User | ||||||
| from flask_mail import Message | from flask_mail import Message | ||||||
| from main import db | from main import db | ||||||
| from uuid import uuid4 | from uuid import uuid4 | ||||||
| import secrets | import secrets | ||||||
| from main import mail | from main import mail | ||||||
| from datetime import datetime, date, timedelta | from datetime import datetime, date, timedelta | ||||||
| from .models import Test | from .models.tests import Test | ||||||
|  |  | ||||||
| views = Blueprint( | views = Blueprint( | ||||||
|     'admin_views', |     'admin_views', | ||||||
| @@ -82,7 +82,7 @@ def settings(): | |||||||
| @admin_account_required | @admin_account_required | ||||||
| @login_required | @login_required | ||||||
| def users(): | def users(): | ||||||
|     from .forms import CreateUserForm |     from .models.forms import CreateUserForm | ||||||
|     form = CreateUserForm() |     form = CreateUserForm() | ||||||
|     if request.method == 'GET': |     if request.method == 'GET': | ||||||
|         users_list = decrypt_find(db.users, {}) |         users_list = decrypt_find(db.users, {}) | ||||||
| @@ -132,7 +132,7 @@ def delete_user(_id:str): | |||||||
|     if _id == get_id_from_cookie(): |     if _id == get_id_from_cookie(): | ||||||
|         flash('Cannot delete your own user account.', 'error') |         flash('Cannot delete your own user account.', 'error') | ||||||
|         return redirect(url_for('admin_views.users')) |         return redirect(url_for('admin_views.users')) | ||||||
|     from .forms import DeleteUserForm |     from .models.forms import DeleteUserForm | ||||||
|     form = DeleteUserForm() |     form = DeleteUserForm() | ||||||
|     user = decrypt_find_one(db.users, {'_id': _id}) |     user = decrypt_find_one(db.users, {'_id': _id}) | ||||||
|     if request.method == 'GET': |     if request.method == 'GET': | ||||||
| @@ -181,7 +181,7 @@ def update_user(_id:str): | |||||||
|     if _id == get_id_from_cookie(): |     if _id == get_id_from_cookie(): | ||||||
|         flash('Cannot delete your own user account.', 'error') |         flash('Cannot delete your own user account.', 'error') | ||||||
|         return redirect(url_for('admin_views.users')) |         return redirect(url_for('admin_views.users')) | ||||||
|     from .forms import UpdateUserForm |     from .models.forms import UpdateUserForm | ||||||
|     form = UpdateUserForm() |     form = UpdateUserForm() | ||||||
|     user = decrypt_find_one( db.users, {'_id': _id}) |     user = decrypt_find_one( db.users, {'_id': _id}) | ||||||
|     if request.method == 'GET': |     if request.method == 'GET': | ||||||
| @@ -251,7 +251,7 @@ def tests(filter=''): | |||||||
|     if filter not in ['', 'create', 'active', 'scheduled', 'expired', 'all']: |     if filter not in ['', 'create', 'active', 'scheduled', 'expired', 'all']: | ||||||
|         return abort(404) |         return abort(404) | ||||||
|     if filter == 'create': |     if filter == 'create': | ||||||
|         from .forms import CreateTest |         from .models.forms import CreateTest | ||||||
|         form = CreateTest() |         form = CreateTest() | ||||||
|         form.time_limit.default='none' |         form.time_limit.default='none' | ||||||
|         form.process() |         form.process() | ||||||
| @@ -281,7 +281,7 @@ def tests(filter=''): | |||||||
| @admin_account_required | @admin_account_required | ||||||
| @login_required | @login_required | ||||||
| def _tests(): | def _tests(): | ||||||
|     from .forms import CreateTest |     from .models.forms import CreateTest | ||||||
|     form = CreateTest() |     form = CreateTest() | ||||||
|     form.time_limit.default='none' |     form.time_limit.default='none' | ||||||
|     form.process() |     form.process() | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user