viveksantayana
ca25159830
Moved most of app definitions out of guard function to use wsgi Updated configuration files and referencing of .env values. Local version needs dotenv or exporting of env variables. Dockerised version works fine without load_dotenv. Ready to test now!
196 lines
9.7 KiB
Python
196 lines
9.7 KiB
Python
from flask import flash, make_response, Response, session
|
|
from flask.helpers import url_for
|
|
from flask.json import jsonify
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
from werkzeug.utils import redirect
|
|
from flask_mail import Message
|
|
import secrets
|
|
|
|
from common.security import encrypt, decrypt
|
|
from common.security.database import decrypt_find_one, encrypted_update
|
|
from datetime import datetime, timedelta
|
|
|
|
class User:
|
|
|
|
def __init__(self, _id=None, username=None, password=None, email=None, remember=False):
|
|
self._id = _id
|
|
self.username = username
|
|
self.email = email
|
|
self.password = password
|
|
self.remember = remember
|
|
|
|
def start_session(self, resp:Response):
|
|
resp.set_cookie(
|
|
key = '_id',
|
|
value = self._id,
|
|
max_age = timedelta(days=14) if self.remember else 'Session',
|
|
path = '/',
|
|
expires = datetime.utcnow() + timedelta(days=14) if self.remember else 'Session'
|
|
)
|
|
if self.remember:
|
|
resp.set_cookie (
|
|
key = 'remember',
|
|
value = 'True',
|
|
max_age = timedelta(days=14),
|
|
path = '/',
|
|
expires = datetime.utcnow() + timedelta(days=14)
|
|
)
|
|
|
|
def register(self):
|
|
from main import db
|
|
from ..views import get_id_from_cookie
|
|
user = {
|
|
'_id': self._id,
|
|
'email': encrypt(self.email),
|
|
'password': generate_password_hash(self.password, method='sha256'),
|
|
'username': encrypt(self.username)
|
|
}
|
|
if decrypt_find_one(db.users, { 'username': self.username }):
|
|
return jsonify({ 'error': f'Username {self.username} is not available.' }), 400
|
|
if db.users.insert_one(user):
|
|
flash(f'User {self.username} has been created successfully. You may now use it to log in and administer the tests.', 'success')
|
|
resp = make_response(jsonify(user), 200)
|
|
if not get_id_from_cookie:
|
|
self.start_session(resp)
|
|
return resp
|
|
return jsonify({ 'error': f'Registration failed. An error occurred.' }), 400
|
|
|
|
def login(self):
|
|
from main import db
|
|
user = decrypt_find_one( db.users, { 'username': self.username })
|
|
if not user:
|
|
return jsonify({ 'error': f'Username {self.username} does not exist.' }), 401
|
|
if not check_password_hash( user['password'], self.password ):
|
|
return jsonify({ 'error': f'The password you entered is incorrect.' }), 401
|
|
response = {
|
|
'success': f'Successfully logged in user {self.username}.'
|
|
}
|
|
if 'prev_page' in session:
|
|
response['redirect_to'] = session['prev_page']
|
|
session.pop('prev_page')
|
|
resp = make_response(jsonify(response), 200)
|
|
self._id = user['_id']
|
|
self.start_session(resp)
|
|
return resp
|
|
|
|
def logout(self):
|
|
resp = make_response(redirect(url_for('admin_auth.login')))
|
|
resp.set_cookie(
|
|
key = '_id',
|
|
value = '',
|
|
max_age = timedelta(days=-1),
|
|
path = '/',
|
|
expires= datetime.utcnow() + timedelta(days=-1)
|
|
)
|
|
resp.set_cookie (
|
|
key = 'cookie_consent',
|
|
value = 'True',
|
|
max_age = 'Session',
|
|
path = '/',
|
|
expires = 'Session'
|
|
)
|
|
resp.set_cookie (
|
|
key = 'remember',
|
|
value = 'True',
|
|
max_age = timedelta(days=-1),
|
|
path = '/',
|
|
expires = datetime.utcnow() + timedelta(days=-1)
|
|
)
|
|
flash('You have been logged out. All cookies pertaining to your account have been deleted. Have a nice day.', 'alert')
|
|
return resp
|
|
|
|
def reset_password(self):
|
|
from main import db, mail
|
|
user = decrypt_find_one(db.users, { 'username': self.username })
|
|
if not user:
|
|
return jsonify({ 'error': f'Username {self.username} does not exist.' }), 401
|
|
if not user['email'] == self.email:
|
|
return jsonify({ 'error': f'The email address {self.email} does not match the user account {self.username}.' }), 401
|
|
new_password = secrets.token_hex(12)
|
|
reset_token = secrets.token_urlsafe(16)
|
|
verification_token = secrets.token_urlsafe(16)
|
|
user['password'] = generate_password_hash(new_password, method='sha256')
|
|
if encrypted_update(db.users, { 'username': self.username }, { '$set': {'password': user['password'], 'reset_token': reset_token, 'verification_token': verification_token } } ):
|
|
flash(f'Your password has been reset. Instructions to recover your account have been sent to {self.email}. Please be sure to check your spam folder in case you have not received the email.', 'alert')
|
|
email = Message(
|
|
subject = 'RefTest | Password Reset',
|
|
recipients = [self.email],
|
|
body = f"""
|
|
Hello {user['username']}, \n\n
|
|
This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app.\n\n
|
|
If you did not make this request, please ignore this email.\n\n
|
|
If you did make this request, then you have two options to recover your account.\n\n
|
|
For the time being, your password has been reset to the following:\n\n
|
|
{new_password}\n\n
|
|
You may use this to log back in to your account, and subsequently change your password to something more suitable.\n\n
|
|
Alternatively, you may visit the following private link using your unique token to override your password. Copy and paste the following link in a web browser. Please note that this token is only valid once:\n\n
|
|
{url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}\n\n
|
|
Have a nice day.
|
|
""",
|
|
html = f"""
|
|
<p>Hello {user['username']},</p>
|
|
<p>This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app. </p>
|
|
<p>If you did not make this request, please ignore this email.</p>
|
|
<p>If you did make this request, then you have two options to recover your account.</p>
|
|
<p>For the time being, your password has been reset to the following:</p>
|
|
<strong>{new_password}</strong>
|
|
<p>You may use this to log back in to your account, and subsequently change your password to something more suitable.</p>
|
|
<p>Alternatively, you may visit the following private link using your unique token to override your password. Click on the following link, or copy and paste it into a browser. <strong>Please note that this token is only valid once</strong>:</p>
|
|
<p><a href='{url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}'>{url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}</a></p>
|
|
<p>Have a nice day.</p>
|
|
"""
|
|
)
|
|
mail.send(email)
|
|
return jsonify({ 'success': 'Password reset request has been processed.'}), 200
|
|
|
|
def update(self):
|
|
from main import db
|
|
from ..views import get_id_from_cookie
|
|
retrieved_user = decrypt_find_one(db.users, { '_id': self._id })
|
|
if not retrieved_user:
|
|
return jsonify({ 'error': f'User {retrieved_user["username"]} does not exist.' }), 401
|
|
user = {}
|
|
updated = []
|
|
if not self.email == '' and self.email is not None:
|
|
user['email'] = self.email
|
|
updated.append('email')
|
|
if not self.password == '' and self.password is not None:
|
|
user['password'] = generate_password_hash(self.password, method='sha256')
|
|
updated.append('password')
|
|
output = ''
|
|
if len(updated) == 0:
|
|
flash(f'There were no changes requested for your account.', 'alert'), 200
|
|
return jsonify({'success': 'There were no changes requested for your account.'}), 200
|
|
elif len(updated) == 1:
|
|
output = updated[0]
|
|
elif len(updated) == 2:
|
|
output = ' and '.join(updated)
|
|
elif len(updated) > 2:
|
|
output = updated[0]
|
|
for index in range(1,len(updated)):
|
|
if index < len(updated) - 2:
|
|
output = ', '.join([output, updated[index]])
|
|
elif index == len(updated) - 2:
|
|
output = ', and '.join([output, updated[index]])
|
|
else:
|
|
output = ''.join([output, updated[index]])
|
|
encrypted_update(db.users, {'_id': self._id}, { '$set': user })
|
|
if self._id == get_id_from_cookie():
|
|
_output = 'Your '
|
|
elif retrieved_user['username'][-1] == 's':
|
|
_output = '’'.join([retrieved_user['username'], ''])
|
|
else:
|
|
_output = '’'.join([retrieved_user['username'], 's'])
|
|
_output = f'{_output} {output} {"has" if len(updated) == 1 else "have"} been updated.'
|
|
flash(_output)
|
|
return jsonify({'success': _output}), 200
|
|
|
|
def delete(self):
|
|
from main import db
|
|
retrieved_user = decrypt_find_one(db.users, { '_id': self._id })
|
|
if not retrieved_user:
|
|
return jsonify({ 'error': f'User does not exist.' }), 401
|
|
db.users.find_one_and_delete({'_id': self._id})
|
|
flash(f'User {retrieved_user["username"]} has been deleted.')
|
|
return jsonify({'success': f'User {retrieved_user["username"]} has been deleted.'}), 200
|