From c13f7803aca867fec79a562be6df91afed9da418 Mon Sep 17 00:00:00 2001 From: viveksantayana Date: Wed, 8 Dec 2021 11:27:54 +0000 Subject: [PATCH] Removed personal information --- ref-test/admin/models/users.py | 207 +++++++++++++++++++++++++++++++++ 1 file changed, 207 insertions(+) create mode 100644 ref-test/admin/models/users.py diff --git a/ref-test/admin/models/users.py b/ref-test/admin/models/users.py new file mode 100644 index 0000000..71c9da1 --- /dev/null +++ b/ref-test/admin/models/users.py @@ -0,0 +1,207 @@ +from flask import flash, make_response, Response, session +from flask.helpers import url_for +from flask.json import jsonify +from werkzeug.security import generate_password_hash, check_password_hash +from werkzeug.utils import redirect +from flask_mail import Message +import secrets + +from common.security import encrypt, decrypt +from common.security.database import decrypt_find_one, encrypted_update +from datetime import datetime, timedelta + +class User: + + def __init__(self, _id=None, username=None, password=None, email=None, remember=False): + self._id = _id + self.username = username + self.email = email + self.password = password + self.remember = remember + + def start_session(self, resp:Response): + from main import app + resp.set_cookie( + key = '_id', + value = self._id, + max_age = timedelta(days=14) if self.remember else None, + path = '/', + expires = datetime.utcnow() + timedelta(days=14) if self.remember else None, + domain = f'.{app.config["SERVER_NAME"]}', + secure = True + ) + if self.remember: + resp.set_cookie ( + key = 'remember', + value = 'True', + max_age = timedelta(days=14), + path = '/', + expires = datetime.utcnow() + timedelta(days=14), + domain = f'.{app.config["SERVER_NAME"]}', + secure = True + ) + + def register(self): + from main import db + from ..views import get_id_from_cookie + user = { + '_id': self._id, + 'email': encrypt(self.email), + 'password': generate_password_hash(self.password, method='sha256'), + 'username': encrypt(self.username) + } + if decrypt_find_one(db.users, { 'username': self.username }): + return jsonify({ 'error': f'Username {self.username} is not available.' }), 400 + if db.users.insert_one(user): + flash(f'User {self.username} has been created successfully. You may now use it to log in and administer the tests.', 'success') + resp = make_response(jsonify(user), 200) + if not get_id_from_cookie: + self.start_session(resp) + return resp + return jsonify({ 'error': f'Registration failed. An error occurred.' }), 400 + + def login(self): + from main import db + user = decrypt_find_one( db.users, { 'username': self.username }) + if not user: + return jsonify({ 'error': f'Username {self.username} does not exist.' }), 401 + if not check_password_hash( user['password'], self.password ): + return jsonify({ 'error': f'The password you entered is incorrect.' }), 401 + response = { + 'success': f'Successfully logged in user {self.username}.' + } + if 'prev_page' in session: + response['redirect_to'] = session['prev_page'] + session.pop('prev_page') + resp = make_response(jsonify(response), 200) + self._id = user['_id'] + self.start_session(resp) + return resp + + def logout(self): + resp = make_response(redirect(url_for('admin_auth.login'))) + from main import app + resp.set_cookie( + key = '_id', + value = '', + max_age = timedelta(days=-1), + path = '/', + expires= datetime.utcnow() + timedelta(days=-1), + domain = f'.{app.config["SERVER_NAME"]}', + secure = True + ) + resp.set_cookie ( + key = 'cookie_consent', + value = 'True', + max_age = None, + path = '/', + expires = None, + domain = f'.{app.config["SERVER_NAME"]}', + secure = True + ) + resp.set_cookie ( + key = 'remember', + value = 'True', + max_age = timedelta(days=-1), + path = '/', + expires = datetime.utcnow() + timedelta(days=-1), + domain = f'.{app.config["SERVER_NAME"]}', + secure = True + ) + flash('You have been logged out. All cookies pertaining to your account have been deleted. Have a nice day.', 'alert') + return resp + + def reset_password(self): + from main import db, mail + user = decrypt_find_one(db.users, { 'username': self.username }) + if not user: + return jsonify({ 'error': f'Username {self.username} does not exist.' }), 401 + if not user['email'] == self.email: + return jsonify({ 'error': f'The email address {self.email} does not match the user account {self.username}.' }), 401 + new_password = secrets.token_hex(12) + reset_token = secrets.token_urlsafe(16) + verification_token = secrets.token_urlsafe(16) + user['password'] = generate_password_hash(new_password, method='sha256') + if encrypted_update(db.users, { 'username': self.username }, { '$set': {'password': user['password'], 'reset_token': reset_token, 'verification_token': verification_token } } ): + flash(f'Your password has been reset. Instructions to recover your account have been sent to {self.email}. Please be sure to check your spam folder in case you have not received the email.', 'alert') + email = Message( + subject = 'RefTest | Password Reset', + recipients = [self.email], + body = f""" + Hello {user['username']}, \n\n + This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app.\n\n + If you did not make this request, please ignore this email.\n\n + If you did make this request, then you have two options to recover your account.\n\n + For the time being, your password has been reset to the following:\n\n + {new_password}\n\n + You may use this to log back in to your account, and subsequently change your password to something more suitable.\n\n + Alternatively, you may visit the following private link using your unique token to override your password. Copy and paste the following link in a web browser. Please note that this token is only valid once:\n\n + {url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}\n\n + Have a nice day. + """, + html = f""" +

Hello {user['username']},

+

This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app.

+

If you did not make this request, please ignore this email.

+

If you did make this request, then you have two options to recover your account.

+

For the time being, your password has been reset to the following:

+ {new_password} +

You may use this to log back in to your account, and subsequently change your password to something more suitable.

+

Alternatively, you may visit the following private link using your unique token to override your password. Click on the following link, or copy and paste it into a browser. Please note that this token is only valid once:

+

{url_for('admin_auth.reset_gateway', token1 = reset_token, token2 = verification_token, _external = True)}

+

Have a nice day.

+ """ + ) + mail.send(email) + return jsonify({ 'success': 'Password reset request has been processed.'}), 200 + + def update(self): + from main import db + from ..views import get_id_from_cookie + retrieved_user = decrypt_find_one(db.users, { '_id': self._id }) + if not retrieved_user: + return jsonify({ 'error': f'User {retrieved_user["username"]} does not exist.' }), 401 + user = {} + updated = [] + if not self.email == '' and self.email is not None: + user['email'] = self.email + updated.append('email') + if not self.password == '' and self.password is not None: + user['password'] = generate_password_hash(self.password, method='sha256') + updated.append('password') + output = '' + if len(updated) == 0: + flash(f'There were no changes requested for your account.', 'alert'), 200 + return jsonify({'success': 'There were no changes requested for your account.'}), 200 + elif len(updated) == 1: + output = updated[0] + elif len(updated) == 2: + output = ' and '.join(updated) + elif len(updated) > 2: + output = updated[0] + for index in range(1,len(updated)): + if index < len(updated) - 2: + output = ', '.join([output, updated[index]]) + elif index == len(updated) - 2: + output = ', and '.join([output, updated[index]]) + else: + output = ''.join([output, updated[index]]) + encrypted_update(db.users, {'_id': self._id}, { '$set': user }) + if self._id == get_id_from_cookie(): + _output = 'Your ' + elif retrieved_user['username'][-1] == 's': + _output = '’'.join([retrieved_user['username'], '']) + else: + _output = '’'.join([retrieved_user['username'], 's']) + _output = f'{_output} {output} {"has" if len(updated) == 1 else "have"} been updated.' + flash(_output) + return jsonify({'success': _output}), 200 + + def delete(self): + from main import db + retrieved_user = decrypt_find_one(db.users, { '_id': self._id }) + if not retrieved_user: + return jsonify({ 'error': f'User does not exist.' }), 401 + db.users.find_one_and_delete({'_id': self._id}) + flash(f'User {retrieved_user["username"]} has been deleted.') + return jsonify({'success': f'User {retrieved_user["username"]} has been deleted.'}), 200