Compare commits

...

3 Commits

5 changed files with 143 additions and 34 deletions

View File

@ -30,7 +30,7 @@ def create_app():
def _check_cookie_consent():
if request.cookies.get('cookie_consent'):
return
if any([ request.path.startswith(x) for x in [ '/admin/static/', '/root/', '/quiz/static', '/cookies/', '/admin/editor/static' ] ]):
if any([ request.path.startswith(x) for x in [ '/admin/static/', '/root/', '/quiz/static', '/cookies/', '/admin/editor/static', '/admin/view/static' ] ]):
return
flash(f'<strong>Cookie Consent</strong>: This web site only stores minimal, functional cookies. It does not store any tracking information. By using this site, you consent to this use of cookies. For more information, see our <a href="{url_for("views._privacy")}">privacy policy</a>.', 'cookie_alert')

View File

@ -5,6 +5,7 @@ from ..tools.logs import write
from flask import flash
from flask import current_app as app
from flask_login import current_user
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.utils import secure_filename
from datetime import datetime
@ -45,7 +46,12 @@ class Dataset(db.Model):
for dataset in Dataset.query.all():
dataset.default = False
self.default = True
db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when setting default dataset {self.id}: {exception}')
return False, f'Database error {exception}.'
write('system.log', f'Dataset {self.id} set as default by {current_user.get_username()}.')
flash(message='Dataset set as default.', category='success')
return True, f'Dataset set as default.'
@ -63,9 +69,14 @@ class Dataset(db.Model):
filename = secure_filename('.'.join([self.id,'json']))
data = Path(app.config.get('DATA'))
file_path = path.join(data, 'questions', filename)
try:
db.session.delete(self)
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when trying to delete dataset {self.id}: {exception}')
return False, f'Database error: {exception}'
remove(file_path)
db.session.delete(self)
db.session.commit()
return True, 'Dataset deleted.'
def create(self, data:list, default:bool=False):
@ -78,8 +89,13 @@ class Dataset(db.Model):
self.creator = current_user
if default: self.make_default()
write('system.log', f'New dataset {self.get_name()} added by {current_user.get_username()}.')
db.session.add(self)
db.session.commit()
try:
db.session.add(self)
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when trying to crreate dataset {self.id}: {exception}')
return False, f'Database error: {exception}'
return True, 'Dataset created.'
def check_file(self):
@ -103,6 +119,11 @@ class Dataset(db.Model):
dump(data, file, indent=2)
write('system.log', f'Dataset {self.id} edited by {current_user.get_username()}.')
flash(f'Dataset {self.get_name()} successfully edited.', 'success')
db.session.add(self)
db.session.commit()
try:
db.session.add(self)
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when trying to update dataset {self.id}: {exception}')
return False, f'Database error: {exception}'
return True, 'Dataset successfully edited.'

View File

@ -7,6 +7,7 @@ from .test import Test
from flask_login import current_user
from flask_mail import Message
from smtplib import SMTPException
from sqlalchemy.exc import SQLAlchemyError
from datetime import datetime, timedelta
from uuid import uuid4
@ -70,23 +71,32 @@ class Entry(db.Model):
def ready(self):
self.generate_id()
db.session.add(self)
db.session.commit()
write('tests.log', f'New test ready for {self.get_first_name()} {self.get_surname()}.')
try:
db.session.add(self)
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when preparing new entry for {self.get_surname()}, {self.get_first_name()}: {exception}')
return False, f'Database error: {exception}'
write('tests.log', f'New test ready for {self.get_surname()}, {self.get_first_name()} with id {self.id}.')
return True, f'Test ready.'
def start(self):
self.start_time = datetime.now()
self.status = 'started'
write('tests.log', f'Test started by {self.get_first_name()} {self.get_surname()}.')
db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when starting test for {self.get_surname()}, {self.get_first_name()}: {exception}')
return False, f'Database error: {exception}'
write('tests.log', f'Test started by {self.get_surname()}, {self.get_first_name()} with id {self.id}.')
return True, f'New test started with id {self.id}.'
def complete(self, answers:dict=None, result:dict=None):
self.end_time = datetime.now()
self.answers = answers
self.result = result
write('tests.log', f'Test completed by {self.get_first_name()} {self.get_surname()}.')
delta = timedelta(minutes=int(0 if self.test.time_limit is None else self.test.time_limit)+1)
if not self.test.time_limit or self.end_time <= self.start_time + delta:
self.status = 'completed'
@ -94,7 +104,13 @@ class Entry(db.Model):
else:
self.status = 'late'
self.valid = False
db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when submitting entry for {self.get_surname()}, {self.get_first_name()}: {exception}')
return False, f'Database error: {exception}'
write('tests.log', f'Test completed by {self.get_surname()}, {self.get_first_name()} with id {self.id}.')
return True, f'Test entry completed for id {self.id}.'
def validate(self):
@ -102,15 +118,25 @@ class Entry(db.Model):
if self.status == 'started': return False, 'The entry is still pending.'
self.valid = True
self.status = 'completed'
db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when validating entry {self.id}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'The entry {self.id} has been validated by {current_user.get_username()}.')
return True, f'The entry {self.id} has been validated.'
def delete(self):
id = self.id
name = f'{self.get_first_name()} {self.get_surname()}'
db.session.delete(self)
db.session.commit()
try:
db.session.delete(self)
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when deleting entry {id}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'The entry {id} by {name} has been deleted by {current_user.get_username()}.')
return True, 'Entry deleted.'

View File

@ -3,6 +3,7 @@ from ..tools.forms import JsonEncodedDict
from ..tools.logs import write
from flask_login import current_user
from sqlalchemy.exc import SQLAlchemyError
from datetime import date, datetime
import secrets
@ -52,16 +53,25 @@ class Test(db.Model):
errors.append('The expiry date cannot be before the start date.')
if errors:
return False, errors
db.session.add(self)
db.session.commit()
try:
db.session.add(self)
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when creating test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Test with code {self.get_code()} created by {current_user.get_username()}.')
return True, f'Test with code {self.get_code()} has been created.'
def delete(self):
code = self.code
if self.entries: return False, f'Cannot delete a test with submitted entries.'
db.session.delete(self)
db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when deleting test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Test with code {self.get_code()} has been deleted by {current_user.get_username()}.')
return True, f'Test with code {self.get_code()} has been deleted.'
@ -69,7 +79,12 @@ class Test(db.Model):
now = datetime.now()
if self.start_date.date() > now.date():
self.start_date = now
db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when launching test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Test with code {self.get_code()} has been started by {current_user.get_username()}.')
return True, f'Test with code {self.get_code()} has been started.'
return False, f'Test with code {self.get_code()} has already started.'
@ -78,7 +93,12 @@ class Test(db.Model):
now = datetime.now()
if self.end_date >= now:
self.end_date = now
db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when closing test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Test with code {self.get_code()} ended by {current_user.get_username()}.')
return True, f'Test with code {self.get_code()} has been ended.'
return False, f'Test with code {self.get_code()} has already ended.'
@ -88,7 +108,12 @@ class Test(db.Model):
code = secrets.token_hex(3).lower()
adjustments[code] = time
self.adjustments = adjustments
db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when adding adjustment to test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Time adjustment for {time} minutes with code {code} added to test {self.get_code()} by {current_user.get_username()}.')
return True, f'Time adjustment for {time} minutes added to test {self.get_code()}. This can be accessed using the user code {code.upper()}.'
@ -96,7 +121,12 @@ class Test(db.Model):
if not self.adjustments: return False, f'There are no adjustments configured for test {self.get_code()}.'
self.adjustments.pop(code)
if not self.adjustments: self.adjustments = None
db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when deleting adjustment from test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Time adjustment for with code {code} has been removed from test {self.get_code()} by {current_user.get_username()}.')
return True, f'Time adjustment for with code {code} has been removed from test {self.get_code()}.'
@ -105,6 +135,11 @@ class Test(db.Model):
if start_date: self.start_date = start_date
if end_date: self.end_date = end_date
if time_limit is not None: self.time_limit = time_limit
db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when updating test {self.get_code()}: {exception}')
return False, f'Database error: {exception}'
write('system.log', f'Test with code {self.get_code()} has been updated by user {current_user.get_username()}.')
return True, f'Test with code {self.get_code()} has been updated by.'

View File

@ -7,6 +7,7 @@ from flask.helpers import url_for
from flask_login import current_user, login_user, logout_user, UserMixin
from flask_mail import Message
from smtplib import SMTPException
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.security import check_password_hash, generate_password_hash
import secrets
@ -61,8 +62,13 @@ class User(UserMixin, db.Model):
if user.get_username() == self.get_username(): return False, f'Username {self.get_username()} already in use.'
if user.get_email() == self.get_email(): return False, f'Email address {self.get_email()} already in use.'
self.set_password(password=password)
db.session.add(self)
db.session.commit()
try:
db.session.add(self)
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when registering user {self.get_username()}: {exception}')
return False, f'Database error: {exception}'
write('users.log', f'User \'{self.get_username()}\' was created with id \'{self.id}\'.')
if notify:
email = Message(
@ -149,19 +155,35 @@ class User(UserMixin, db.Model):
mail.send(email)
except SMTPException as exception:
write('system.log', f'SMTP Error while trying to reset password for {self.get_username()} with error: {exception}')
db.session.rollback()
return jsonify({'error': f'SMTP Error: {exception}'}), 500
db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when resetting password for user {self.get_username()}: {exception}')
return False, f'Database error: {exception}'
return jsonify({'success': 'Your password reset link has been generated.'}), 200
def clear_reset_tokens(self):
self.reset_token = self.verification_token = None
db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when resetting clearing reset tokens for user {self.get_username()}: {exception}')
return False, f'Database error: {exception}'
def delete(self, notify:bool=False):
username = self.get_username()
email_address = self.get_email()
db.session.delete(self)
db.session.commit()
try:
db.session.delete(self)
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when deleting user {self.get_username()}: {exception}')
return False, f'Database error: {exception}'
message = f'User \'{username}\' was deleted by \'{current_user.get_username()}\'.'
write('users.log', message)
if notify:
@ -200,7 +222,12 @@ class User(UserMixin, db.Model):
for entry in User.query.all():
if entry.get_email() == email and not entry == self: return False, f'The email address {email} is already in use.'
self.set_email(email)
db.session.commit()
try:
db.session.commit()
except SQLAlchemyError as exception:
db.session.rollback()
write('system.log', f'Database error when updating user {self.get_username()}: {exception}')
return False, f'Database error: {exception}'
_current_user = current_user.get_username() if current_user.is_authenticated else 'anonymous'
write('system.log', f'Information for user {self.get_username()} has been updated by {_current_user}.')
if notify: