diff --git a/ref-test/app/models/entry.py b/ref-test/app/models/entry.py new file mode 100644 index 0000000..5bcab96 --- /dev/null +++ b/ref-test/app/models/entry.py @@ -0,0 +1,49 @@ +from main import db + +class Entry(db.Model): + + id = db.Column(db.String(36), primary_key=True) + first_name = db.Column(db.String(128), nullable=False) + surname = db.Column(db.String(128), nullable=False) + email = db.Column(db.String(128), nullable=False) + club = db.Column(db.String(128), nullable=True) + test_id = db.Column(db.String(36), db.ForeignKey('test.id')) + test_code = db.Column(db.String(36), db.ForeignKey('test.test_code')) + user_code = db.Column(db.String(6), nullable=True) + start_time = db.Column(db.DateTime, nullable=False) + end_time = db.Column(db.DateTime, nullable=True) + status = db.Column(db.String(16), nullable=True) + late_ignore = db.Column(db.Boolean, default=False, nullable=True) + answers = db.Column(JsonEncodedDict, nullable=True) + + @property + def set_first_name(self): raise AttributeError('set_first_name is not a readable attribute.') + + set_first_name.setter + def set_first_name(self, name:str): self.first_name = encrypt(name) + + def get_first_name(self): return decrypt(self.first_name) + + @property + def set_surname(self): raise AttributeError('set_first_name is not a readable attribute.') + + set_surname.setter + def set_surname(self, name:str): self.surname = encrypt(name) + + def get_surname(self): return decrypt(self.surname) + + @property + def set_email(self): raise AttributeError('set_email is not a readable attribute.') + + set_name.setter + def set_email(self, email:str): self.email = encrypt(email) + + def get_email(self): return decrypt(self.email) + + @property + def set_club(self): raise AttributeError('set_club is not a readable attribute.') + + set_name.setter + def set_club(self, club:str): self.club = encrypt(club) + + def get_club(self): return decrypt(self.club) \ No newline at end of file diff --git a/ref-test/app/models/test.py b/ref-test/app/models/test.py new file mode 100644 index 0000000..8ab15b1 --- /dev/null +++ b/ref-test/app/models/test.py @@ -0,0 +1,112 @@ +class Test: + + def __init__(self, _id=None, start_date=None, expiry_date=None, time_limit=None, creator=None, dataset=None): + self._id = _id + self.start_date = start_date + self.expiry_date = expiry_date + self.time_limit = None if time_limit == 'none' or time_limit == '' or time_limit == None else int(time_limit) + self.creator = creator + self.dataset = dataset + + def create(self): + from main import app, db + test = { + '_id': self._id, + 'date_created': datetime.today(), + 'start_date': self.start_date, + 'expiry_date': self.expiry_date, + 'time_limit': self.time_limit, + 'creator': encrypt(self.creator), + 'test_code': secrets.token_hex(6).upper(), + 'dataset': self.dataset + } + if db.tests.insert_one(test): + dataset_file_path = os.path.join(app.config["DATA_FILE_DIRECTORY"],self.dataset) + with open(dataset_file_path, 'r') as dataset_file: + data = loads(dataset_file.read()) + data['meta']['tests'].append(self._id) + with open(dataset_file_path, 'w') as dataset_file: + dump(data, dataset_file, indent=2) + flash(f'Created a new exam with Exam Code {self.render_test_code(test["test_code"])}.', 'success') + return jsonify({'success': test}), 200 + return jsonify({'error': f'Could not create exam. An error occurred.'}), 400 + + def add_time_adjustment(self, time_adjustment): + from main import db + user_code = secrets.token_hex(3).upper() + adjustment = { + user_code: time_adjustment + } + if db.tests.find_one_and_update({'_id': self._id}, {'$set': {'time_adjustments': adjustment}},upsert=False): + flash(f'Time adjustment for {time_adjustment} minutes has been added. This can be enabled using the user code {user_code}.') + return jsonify({'success': adjustment}) + return jsonify({'error': 'Failed to add the time adjustment. An error occurred.'}), 400 + + def remove_time_adjustment(self, user_code): + from main import db + if db.tests.find_one_and_update({'_id': self._id}, {'$unset': {f'time_adjustments.{user_code}': {}}}): + message = 'Time adjustment has been deleted.' + flash(message, 'success') + return jsonify({'success': message}) + return jsonify({'error': 'Failed to delete the time adjustment. An error occurred.'}), 400 + + def render_test_code(self, test_code): + return '—'.join([test_code[:4], test_code[4:8], test_code[8:]]) + + def parse_test_code(self, test_code): + return test_code.replace('—', '') + + def delete(self): + from main import app, db + test = db.tests.find_one({'_id': self._id}) + if 'entries' in test: + if test['entries']: + return jsonify({'error': 'Cannot delete an exam that has entries submitted to it.'}), 400 + if self.dataset is None: + self.dataset = db.tests.find_one({'_id': self._id})['dataset'] + if db.tests.delete_one({'_id': self._id}): + dataset_file_path = os.path.join(app.config["DATA_FILE_DIRECTORY"],self.dataset) + with open(dataset_file_path, 'r') as dataset_file: + data = loads(dataset_file.read()) + data['meta']['tests'].remove(self._id) + with open(dataset_file_path, 'w') as dataset_file: + dump(data, dataset_file, indent=2) + message = 'Deleted exam.' + flash(message, 'alert') + return jsonify({'success': message}), 200 + return jsonify({'error': f'Could not create exam. An error occurred.'}), 400 + + def update(self): + from main import db + test = {} + updated = [] + if not self.start_date == '' and self.start_date is not None: + test['start_date'] = self.start_date + updated.append('start date') + if not self.expiry_date == '' and self.expiry_date is not None: + test['expiry_date'] = self.expiry_date + updated.append('expiry date') + if not self.time_limit == '' and self.time_limit is not None: + test['time_limit'] = int(self.time_limit) + updated.append('time limit') + output = '' + if len(updated) == 0: + flash(f'There were no changes requested for your account.', 'alert'), 200 + return jsonify({'success': 'There were no changes requested for your account.'}), 200 + elif len(updated) == 1: + output = updated[0] + elif len(updated) == 2: + output = ' and '.join(updated) + elif len(updated) > 2: + output = updated[0] + for index in range(1,len(updated)): + if index < len(updated) - 2: + output = ', '.join([output, updated[index]]) + elif index == len(updated) - 2: + output = ', and '.join([output, updated[index]]) + else: + output = ''.join([output, updated[index]]) + db.tests.find_one_and_update({'_id': self._id}, {'$set': test}) + _output = f'The {output} of the test {"has" if len(updated) == 1 else "have"} been updated.' + flash(_output) + return jsonify({'success': _output}), 200 \ No newline at end of file diff --git a/ref-test/app/models/user.py b/ref-test/app/models/user.py new file mode 100644 index 0000000..1be2392 --- /dev/null +++ b/ref-test/app/models/user.py @@ -0,0 +1,96 @@ +from ..modules import db +from ..tools.encryption import decrypt, encrypt +from ..tools.logs import write + +import secrets + +from flask import flash, jsonify, session +from flask.helpers import url_for +from flask_login import UserMixin, login_user, logout_user +from werkzeug.security import check_password_hash, generate_password_hash + +class User(UserMixin, db.Model): + id = db.Column(db.String(36), primary_key=True) + username = db.Column(db.String(128), nullable=False) + password = db.Column(db.String(128), nullable=False) + email = db.Column(db.String(128), nullable=False) + reset_token = db.Column(db.String(20), nullable=True) + verification_token = db.Column(db.String(20), nullable=True) + + def __repr__(self): + return f' was added with .' + + @property + def set_username(self): raise AttributeError('set_username is not a readable attribute.') + + set_username.setter + def set_username(self, username:str): self.username = encrypt(username) + + def get_username(self): return decrypt(self.username) + + @property + def set_password(self): raise AttributeError('set_password is not a readable attribute.') + + set_password.setter + def set_password(self, password:str): self.password = generate_password_hash(password, method="sha256") + + def verify_password(self, password:str): return check_password_hash(self.password, password) + + @property + def set_email(self): raise AttributeError('set_email is not a readable attribute.') + + set_email.setter + def set_email(self, email:str): self.email = encrypt(email) + + def get_email(self): return decrypt(self.email) + + def register(self): + users = User.query.all() + for user in users: + if user.get_username() == self.get_username(): + return False, f'Username {self.get_username()} already in use.' + elif user.get_email() == self.get_email(): + return False, f'Email address {self.get_email()} already in use.' + db.session.add(self) + db.session.commit() + write('users.log', f'User \'{self.get_username()}\' was created with id \'{self.id}\'.') + return True, f'User {self.get_username()} was created successfully.' + + def login(self, remember:bool=False): + self.authenticated = True + db.session.add(self) + db.session.commit() + login_user(self, remember = remember) + write('users.log', f'User \'{self.get_username()}\' has logged in.') + flash(message=f'Welcome {self.get_username()}', category='success') + + def logout(self): + self.authenticated = False + db.session.add(self) + db.session.commit() + session['remembered_username'] = self.get_username() + logout_user() + write('users.log', f'User \'{self.get_username()}\' has logged out.') + flash(message='You have successfully logged out.', category='success') + + def reset_password(self): + new_password = secrets.token_hex(12) + self.set_password(new_password) + self.reset_token = secrets.token_urlsafe(16) + self.verification_token = secrets.token_urlsafe(16) + db.session.commit() + print('Password', new_password) + print('Reset Token', self.reset_token) + print('Verification Token', self.verification_token) + print('Reset Link', f'{url_for("auth._reset", token=self.reset_token, verification=self.verification_token, _external=True)}') + return jsonify({'success': 'Your password reset link has been generated.'}), 200 + + def clear_reset_tokens(self): + self.reset_token = self.verification_token = None + db.session.commit() + + def delete(self): + username = self.get_username() + db.session.delete(self) + db.session.commit() + write('users.log', f'User \'{username}\' was deleted.') # TODO add current user