2021-11-23 13:00:03 +00:00
from flask import flash , make_response , Response
from flask . helpers import url_for
from flask . json import jsonify
from werkzeug . security import generate_password_hash , check_password_hash
from werkzeug . utils import redirect
from flask_mail import Message
import secrets
2021-11-28 02:30:46 +00:00
from common . security import encrypt , decrypt
from common . security . database import decrypt_find_one , encrypted_update
2021-11-23 13:00:03 +00:00
from datetime import datetime , timedelta
from main import db , mail
class User :
def __init__ ( self , _id = None , username = None , password = None , email = None , remember = False ) :
self . _id = _id
self . username = username
self . email = email
self . password = password
self . remember = remember
def start_session ( self , resp : Response ) :
resp . set_cookie (
key = ' _id ' ,
value = self . _id ,
max_age = timedelta ( days = 14 ) if self . remember else ' Session ' ,
path = ' / ' ,
expires = datetime . utcnow ( ) + timedelta ( days = 14 ) if self . remember else ' Session '
)
if self . remember :
resp . set_cookie (
key = ' remember ' ,
value = ' True ' ,
max_age = timedelta ( days = 14 ) ,
path = ' / ' ,
expires = datetime . utcnow ( ) + timedelta ( days = 14 )
)
def register ( self ) :
from . . views import get_id_from_cookie
user = {
' _id ' : self . _id ,
' email ' : encrypt ( self . email ) ,
' password ' : generate_password_hash ( self . password , method = ' sha256 ' ) ,
' username ' : encrypt ( self . username )
}
if decrypt_find_one ( db . users , { ' username ' : self . username } ) :
return jsonify ( { ' error ' : f ' Username { self . username } is not available. ' } ) , 400
if db . users . insert_one ( user ) :
flash ( f ' User { self . username } has been created successfully. You may now use it to log in and administer the tests. ' , ' success ' )
resp = make_response ( jsonify ( user ) , 200 )
if not get_id_from_cookie :
self . start_session ( resp )
return resp
return jsonify ( { ' error ' : f ' Registration failed. An error occurred. ' } ) , 400
def login ( self ) :
user = decrypt_find_one ( db . users , { ' username ' : self . username } )
if not user :
return jsonify ( { ' error ' : f ' Username { self . username } does not exist. ' } ) , 401
if not check_password_hash ( user [ ' password ' ] , self . password ) :
return jsonify ( { ' error ' : f ' The password you entered is incorrect. ' } ) , 401
resp = make_response ( jsonify ( { ' success ' : f ' Successfully logged in user { self . username } . ' } ) , 200 )
self . _id = user [ ' _id ' ]
self . start_session ( resp )
return resp
def logout ( self ) :
resp = make_response ( redirect ( url_for ( ' admin_auth.login ' ) ) )
resp . set_cookie (
key = ' _id ' ,
value = ' ' ,
max_age = timedelta ( days = - 1 ) ,
path = ' / ' ,
expires = datetime . utcnow ( ) + timedelta ( days = - 1 )
)
resp . set_cookie (
key = ' cookie_consent ' ,
value = ' True ' ,
max_age = ' Session ' ,
path = ' / ' ,
expires = ' Session '
)
resp . set_cookie (
key = ' remember ' ,
value = ' True ' ,
max_age = timedelta ( days = - 1 ) ,
path = ' / ' ,
expires = datetime . utcnow ( ) + timedelta ( days = - 1 )
)
flash ( ' You have been logged out. All cookies pertaining to your account have been deleted. Have a nice day. ' , ' alert ' )
return resp
def reset_password ( self ) :
user = decrypt_find_one ( db . users , { ' username ' : self . username } )
if not user :
return jsonify ( { ' error ' : f ' Username { self . username } does not exist. ' } ) , 401
if not decrypt ( user [ ' email ' ] ) == self . email :
return jsonify ( { ' error ' : f ' The email address { self . email } does not match the user account { self . username } . ' } ) , 401
new_password = secrets . token_hex ( 12 )
reset_token = secrets . token_urlsafe ( 16 )
verification_token = secrets . token_urlsafe ( 16 )
user [ ' password ' ] = generate_password_hash ( new_password , method = ' sha256 ' )
if encrypted_update ( { ' username ' : self . username } , { ' $set ' : { ' password ' : user [ ' password ' ] , ' reset_token ' : reset_token , ' verification_token ' : verification_token } } ) :
flash ( f ' Your password has been reset. Instructions to recover your account have been sent to { self . email } . Please be sure to check your spam folder in case you have not received the email. ' , ' alert ' )
email = Message (
subject = ' RefTest | Password Reset ' ,
recipients = [ self . email ] ,
body = f """
Hello { user [ ' username ' ] } , \n \n
This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app . \n \n
If you did not make this request , please ignore this email . \n \n
If you did make this request , then you have two options to recover your account . \n \n
For the time being , your password has been reset to the following : \n \n
{ new_password } \n \n
You may use this to log back in to your account , and subsequently change your password to something more suitable . \n \n
Alternatively , you may visit the following private link using your unique token to override your password . Copy and paste the following link in a web browser . Please note that this token is only valid once : \n \n
{ url_for ( ' admin_auth.reset_gateway ' , token1 = reset_token , token2 = verification_token , _external = True ) } \n \n
Have a nice day .
""" ,
html = f """
< p > Hello { user [ ' username ' ] } , < / p >
< p > This email was generated because we received a request to reset the password for your administrator account for the SKA RefTest app . < / p >
< p > If you did not make this request , please ignore this email . < / p >
< p > If you did make this request , then you have two options to recover your account . < / p >
< p > For the time being , your password has been reset to the following : < / p >
< strong > { new_password } < / strong >
< p > You may use this to log back in to your account , and subsequently change your password to something more suitable . < / p >
< p > Alternatively , you may visit the following private link using your unique token to override your password . Click on the following link , or copy and paste it into a browser . < strong > Please note that this token is only valid once < / strong > : < / p >
< p > < a href = ' { url_for( ' admin_auth . reset_gateway ' , token1 = reset_token, token2 = verification_token, _external = True)} ' > { url_for ( ' admin_auth.reset_gateway ' , token1 = reset_token , token2 = verification_token , _external = True ) } < / a > < / p >
< p > Have a nice day . < / p >
"""
)
mail . send ( email )
return jsonify ( { ' success ' : ' Password reset request has been processed. ' } ) , 200
def update ( self ) :
from . . views import get_id_from_cookie
retrieved_user = decrypt_find_one ( db . users , { ' _id ' : self . _id } )
if not retrieved_user :
return jsonify ( { ' error ' : f ' User { retrieved_user [ " username " ] } does not exist. ' } ) , 401
user = { }
updated = [ ]
if not self . email == ' ' and self . email is not None :
user [ ' email ' ] = self . email
updated . append ( ' email ' )
if not self . password == ' ' and self . password is not None :
user [ ' password ' ] = generate_password_hash ( self . password , method = ' sha256 ' )
updated . append ( ' password ' )
output = ' '
if len ( updated ) == 0 :
flash ( f ' There were no changes requested for your account. ' , ' alert ' ) , 200
return jsonify ( { ' success ' : ' There were no changes requested for your account. ' } ) , 200
elif len ( updated ) == 1 :
output = updated [ 0 ]
elif len ( updated ) == 2 :
output = ' and ' . join ( updated )
elif len ( updated ) > 2 :
output = updated [ 0 ]
for index in range ( 1 , len ( updated ) ) :
if index < len ( updated ) - 2 :
output = ' , ' . join ( [ output , updated [ index ] ] )
elif index == len ( updated ) - 2 :
output = ' , and ' . join ( [ output , updated [ index ] ] )
else :
output = ' ' . join ( [ output , updated [ index ] ] )
encrypted_update ( db . users , { ' _id ' : self . _id } , { ' $set ' : user } )
if self . _id == get_id_from_cookie ( ) :
_output = ' Your '
elif retrieved_user [ ' username ' ] [ - 1 ] == ' s ' :
_output = ' ’ ' . join ( [ retrieved_user [ ' username ' ] , ' ' ] )
else :
_output = ' ’ ' . join ( [ retrieved_user [ ' username ' ] , ' s ' ] )
_output = f ' { _output } { output } { " has " if len ( updated ) == 1 else " have " } been updated. '
flash ( _output )
return jsonify ( { ' success ' : _output } ) , 200
def delete ( self ) :
retrieved_user = decrypt_find_one ( db . users , { ' _id ' : self . _id } )
if not retrieved_user :
return jsonify ( { ' error ' : f ' User does not exist. ' } ) , 401
db . users . find_one_and_delete ( { ' _id ' : self . _id } )
flash ( f ' User { retrieved_user [ " username " ] } has been deleted. ' )
return jsonify ( { ' success ' : f ' User { retrieved_user [ " username " ] } has been deleted. ' } ) , 200