From 5b2e6dda67cb11ed573bd6619f0da05b12d95f0f Mon Sep 17 00:00:00 2001 From: viveksantayana Date: Thu, 25 Nov 2021 23:21:48 +0000 Subject: [PATCH] Refactor to have all models in the models package. --- ref-test/admin/auth.py | 12 +- ref-test/admin/{user => models}/__init__.py | 0 ref-test/admin/{ => models}/forms.py | 0 ref-test/admin/{models.py => models/tests.py} | 0 ref-test/admin/user/models.py | 185 ------------------ ref-test/admin/views.py | 14 +- 6 files changed, 13 insertions(+), 198 deletions(-) rename ref-test/admin/{user => models}/__init__.py (100%) rename ref-test/admin/{ => models}/forms.py (100%) rename ref-test/admin/{models.py => models/tests.py} (100%) delete mode 100644 ref-test/admin/user/models.py diff --git a/ref-test/admin/auth.py b/ref-test/admin/auth.py index fd9e854..2c6e259 100644 --- a/ref-test/admin/auth.py +++ b/ref-test/admin/auth.py @@ -1,7 +1,7 @@ from flask import Blueprint, render_template, request, session, redirect from flask.helpers import flash, url_for from flask.json import jsonify -from .user.models import User +from .models.users import User from uuid import uuid4 from security.database import decrypt_find_one, encrypted_update from werkzeug.security import check_password_hash @@ -21,7 +21,7 @@ auth = Blueprint( @admin_account_required @login_required def account(): - from .forms import UpdateAccountForm + from .models.forms import UpdateAccountForm form = UpdateAccountForm() _id = get_id_from_cookie() user = decrypt_find_one(db.users, {'_id': _id}) @@ -46,7 +46,7 @@ def account(): @admin_account_required @disable_if_logged_in def login(): - from .forms import LoginForm + from .models.forms import LoginForm form = LoginForm() if request.method == 'GET': return render_template('/admin/auth/login.html', form=form) @@ -72,7 +72,7 @@ def logout(): @auth.route('/register/', methods=['GET','POST']) @disable_on_registration def register(): - from .forms import RegistrationForm + from .models.forms import RegistrationForm form = RegistrationForm() if request.method == 'GET': return render_template('/admin/auth/register.html', form=form) @@ -93,7 +93,7 @@ def register(): @admin_account_required @disable_if_logged_in def reset(): - from .forms import ResetPasswordForm + from .models.forms import ResetPasswordForm form = ResetPasswordForm() if request.method == 'GET': return render_template('/admin/auth/reset.html', form=form) @@ -128,7 +128,7 @@ def reset_gateway(token1,token2): @admin_account_required @disable_if_logged_in def update_password_(): - from .forms import UpdatePasswordForm + from .models.forms import UpdatePasswordForm form = UpdatePasswordForm() if request.method == 'GET': if 'reset_validated' not in session: diff --git a/ref-test/admin/user/__init__.py b/ref-test/admin/models/__init__.py similarity index 100% rename from ref-test/admin/user/__init__.py rename to ref-test/admin/models/__init__.py diff --git a/ref-test/admin/forms.py b/ref-test/admin/models/forms.py similarity index 100% rename from ref-test/admin/forms.py rename to ref-test/admin/models/forms.py diff --git a/ref-test/admin/models.py b/ref-test/admin/models/tests.py similarity index 100% rename from ref-test/admin/models.py rename to ref-test/admin/models/tests.py diff --git a/ref-test/admin/user/models.py b/ref-test/admin/user/models.py deleted file mode 100644 index a983975..0000000 --- a/ref-test/admin/user/models.py +++ /dev/null @@ -1,185 +0,0 @@ -from flask import flash, make_response, Response -from flask.helpers import url_for -from flask.json import jsonify -from werkzeug.security import generate_password_hash, check_password_hash -from werkzeug.utils import redirect -from flask_mail import Message -import secrets - -from security import encrypt, decrypt -from security.database import decrypt_find_one, encrypted_update -from datetime import datetime, timedelta -from main import db, mail - -class User: - - def __init__(self, _id=None, username=None, password=None, email=None, remember=False): - self._id = _id - self.username = username - self.email = email - self.password = password - self.remember = remember - - def start_session(self, resp:Response): - resp.set_cookie( - key = '_id', - value = self._id, - max_age = timedelta(days=14) if self.remember else 'Session', - path = '/', - expires = datetime.utcnow() + timedelta(days=14) if self.remember else 'Session' - ) - if self.remember: - resp.set_cookie ( - key = 'remember', - value = 'True', - max_age = timedelta(days=14), - path = '/', - expires = datetime.utcnow() + timedelta(days=14) - ) - - def register(self): - from ..views import get_id_from_cookie - user = { - '_id': self._id, - 'email': encrypt(self.email), - 'password': generate_password_hash(self.password, method='sha256'), - 'username': encrypt(self.username) - } - if decrypt_find_one(db.users, { 'username': self.username }): - return jsonify({ 'error': f'Username {self.username} is not available.' }), 400 - if db.users.insert_one(user): - flash(f'User {self.username} has been created successfully. You may now use it to log in and administer the tests.', 'success') - resp = make_response(jsonify(user), 200) - if not get_id_from_cookie: - self.start_session(resp) - return resp - return jsonify({ 'error': f'Registration failed. An error occurred.' }), 400 - - def login(self): - user = decrypt_find_one( db.users, { 'username': self.username }) - if not user: - return jsonify({ 'error': f'Username {self.username} does not exist.' }), 401 - if not check_password_hash( user['password'], self.password ): - return jsonify({ 'error': f'The password you entered is incorrect.' }), 401 - resp = make_response(jsonify({ 'success': f'Successfully logged in user {self.username}.' }), 200) - self._id = user['_id'] - self.start_session(resp) - return resp - - def logout(self): - resp = make_response(redirect(url_for('admin_auth.login'))) - resp.set_cookie( - key = '_id', - value = '', - max_age = timedelta(days=-1), - path = '/', - expires= datetime.utcnow() + timedelta(days=-1) - ) - resp.set_cookie ( - key = 'cookie_consent', - value = 'True', - max_age = 'Session', - path = '/', - expires = 'Session' - ) - resp.set_cookie ( - key = 'remember', - value = 'True', - max_age = timedelta(days=-1), - path = '/', - expires = datetime.utcnow() + timedelta(days=-1) - ) - flash('You have been logged out. All cookies pertaining to your account have been deleted. Have a nice day.', 'alert') - return resp - - def reset_password(self): - user = decrypt_find_one(db.users, { 'username': self.username }) - if not user: - return jsonify({ 'error': f'Username {self.username} does not exist.' }), 401 - if not decrypt(user['email']) == self.email: - return jsonify({ 'error': f'The email address {self.email} does not match the user account {self.username}.' }), 401 - new_password = secrets.token_hex(12) - reset_token = secrets.token_urlsafe(16) - verification_token = secrets.token_urlsafe(16) - user['password'] = generate_password_hash(new_password, method='sha256') - if encrypted_update( { 'username': self.username }, { '$set': {'password': user['password'], 'reset_token': reset_token, 'verification_token': verification_token } } ): - flash(f'Your password has been reset. Instructions to recover your account have been sent to {self.email}. Please be sure to check your spam folder in case you have not received the email.', 'alert') - email = Message( - subject = 'RefTest | Password Reset', - recipients = [self.email], - body = f""" - Hello {user['username']}, \n\n - This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app.\n\n - If you did not make this request, please ignore this email.\n\n - If you did make this request, then you have two options to recover your account.\n\n - For the time being, your password has been reset to the following:\n\n - {new_password}\n\n - You may use this to log back in to your account, and subsequently change your password to something more suitable.\n\n - Alternatively, you may visit the following private link using your unique token to override your password. Copy and paste the following link in a web browser. Please note that this token is only valid once:\n\n - {url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}\n\n - Have a nice day. - """, - html = f""" -

Hello {user['username']},

-

This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app.

-

If you did not make this request, please ignore this email.

-

If you did make this request, then you have two options to recover your account.

-

For the time being, your password has been reset to the following:

- {new_password} -

You may use this to log back in to your account, and subsequently change your password to something more suitable.

-

Alternatively, you may visit the following private link using your unique token to override your password. Click on the following link, or copy and paste it into a browser. Please note that this token is only valid once:

-

{url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}

-

Have a nice day.

- """ - ) - mail.send(email) - return jsonify({ 'success': 'Password reset request has been processed.'}), 200 - - def update(self): - from ..views import get_id_from_cookie - retrieved_user = decrypt_find_one(db.users, { '_id': self._id }) - if not retrieved_user: - return jsonify({ 'error': f'User {retrieved_user["username"]} does not exist.' }), 401 - user = {} - updated = [] - if not self.email == '' and self.email is not None: - user['email'] = self.email - updated.append('email') - if not self.password == '' and self.password is not None: - user['password'] = generate_password_hash(self.password, method='sha256') - updated.append('password') - output = '' - if len(updated) == 0: - flash(f'There were no changes requested for your account.', 'alert'), 200 - return jsonify({'success': 'There were no changes requested for your account.'}), 200 - elif len(updated) == 1: - output = updated[0] - elif len(updated) == 2: - output = ' and '.join(updated) - elif len(updated) > 2: - output = updated[0] - for index in range(1,len(updated)): - if index < len(updated) - 2: - output = ', '.join([output, updated[index]]) - elif index == len(updated) - 2: - output = ', and '.join([output, updated[index]]) - else: - output = ''.join([output, updated[index]]) - encrypted_update(db.users, {'_id': self._id}, { '$set': user }) - if self._id == get_id_from_cookie(): - _output = 'Your ' - elif retrieved_user['username'][-1] == 's': - _output = '’'.join([retrieved_user['username'], '']) - else: - _output = '’'.join([retrieved_user['username'], 's']) - _output = f'{_output} {output} {"has" if len(updated) == 1 else "have"} been updated.' - flash(_output) - return jsonify({'success': _output}), 200 - - def delete(self): - retrieved_user = decrypt_find_one(db.users, { '_id': self._id }) - if not retrieved_user: - return jsonify({ 'error': f'User does not exist.' }), 401 - db.users.find_one_and_delete({'_id': self._id}) - flash(f'User {retrieved_user["username"]} has been deleted.') - return jsonify({'success': f'User {retrieved_user["username"]} has been deleted.'}), 200 diff --git a/ref-test/admin/views.py b/ref-test/admin/views.py index c6f4fc2..6bc4ce4 100644 --- a/ref-test/admin/views.py +++ b/ref-test/admin/views.py @@ -4,14 +4,14 @@ from functools import wraps from werkzeug.security import check_password_hash from security.database import decrypt_find, decrypt_find_one -from .user.models import User +from .models.users import User from flask_mail import Message from main import db from uuid import uuid4 import secrets from main import mail from datetime import datetime, date, timedelta -from .models import Test +from .models.tests import Test views = Blueprint( 'admin_views', @@ -82,7 +82,7 @@ def settings(): @admin_account_required @login_required def users(): - from .forms import CreateUserForm + from .models.forms import CreateUserForm form = CreateUserForm() if request.method == 'GET': users_list = decrypt_find(db.users, {}) @@ -132,7 +132,7 @@ def delete_user(_id:str): if _id == get_id_from_cookie(): flash('Cannot delete your own user account.', 'error') return redirect(url_for('admin_views.users')) - from .forms import DeleteUserForm + from .models.forms import DeleteUserForm form = DeleteUserForm() user = decrypt_find_one(db.users, {'_id': _id}) if request.method == 'GET': @@ -181,7 +181,7 @@ def update_user(_id:str): if _id == get_id_from_cookie(): flash('Cannot delete your own user account.', 'error') return redirect(url_for('admin_views.users')) - from .forms import UpdateUserForm + from .models.forms import UpdateUserForm form = UpdateUserForm() user = decrypt_find_one( db.users, {'_id': _id}) if request.method == 'GET': @@ -251,7 +251,7 @@ def tests(filter=''): if filter not in ['', 'create', 'active', 'scheduled', 'expired', 'all']: return abort(404) if filter == 'create': - from .forms import CreateTest + from .models.forms import CreateTest form = CreateTest() form.time_limit.default='none' form.process() @@ -281,7 +281,7 @@ def tests(filter=''): @admin_account_required @login_required def _tests(): - from .forms import CreateTest + from .models.forms import CreateTest form = CreateTest() form.time_limit.default='none' form.process()