Added models and views

This commit is contained in:
Vivek Santayana 2022-06-12 21:03:51 +01:00
parent 66e7b2b9f8
commit 8439d99949
12 changed files with 276 additions and 125 deletions

View File

@ -0,0 +1 @@
<h1>Register</h1>

View File

@ -0,0 +1,62 @@
from ..tools.forms import value
from flask_wtf import FlaskForm
from flask_wtf.file import FileAllowed, FileField, FileRequired
from wtforms import BooleanField, DateField, IntegerField, PasswordField, SelectField, StringField
from wtforms.validators import InputRequired, Email, EqualTo, Length, Optional
from datetime import date, timedelta
class Login(FlaskForm):
username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
remember = BooleanField('Remember Log In', render_kw={'checked': True})
class Register(FlaskForm):
username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
email = StringField('Email Address', validators=[InputRequired(), Email(message='You must enter a valid email address.'), Length(max=50)])
password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.'), EqualTo('password', message='Passwords do not match.')])
class ResetPassword(FlaskForm):
username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
email = StringField('Email Address', validators=[InputRequired(), Email(message='You must enter a valid email address.'), Length(max=50)])
class UpdatePassword(FlaskForm):
password = PasswordField('Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.'), EqualTo('password', message='Passwords do not match.')])
class CreateUser(FlaskForm):
username = StringField('Username', validators=[InputRequired(), Length(min=4, max=15)])
email = StringField('Email Address', validators=[InputRequired(), Email(message='You must enter a valid email address.'), Length(max=50)])
password = PasswordField('Password (Optional)', validators=[Optional(),Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
class DeleteUser(FlaskForm):
password = PasswordField('Confirm Your Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
notify = BooleanField('Notify deletion by email', render_kw={'checked': True})
class UpdateUser(FlaskForm):
confirm_password = PasswordField('Confirm Your Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
email = StringField('Email Address', validators=[Optional(), Email(message='You must enter a valid email address.'), Length(max=50)])
password = PasswordField('Change Password', validators=[Optional(),Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter New Password', validators=[EqualTo('password', message='Passwords do not match.')])
notify = BooleanField('Notify changes by email', render_kw={'checked': True})
class UpdateAccount(FlaskForm):
confirm_password = PasswordField('Current Password', validators=[InputRequired(), Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
email = StringField('Email Address', validators=[Optional(), Email(message='You must enter a valid email address.'), Length(max=50)])
password = PasswordField('Change Password', validators=[Optional(),Length(min=6, max=30, message='The password must be between 6 and 20 characters long.')])
password_reenter = PasswordField('Re-Enter New Password', validators=[EqualTo('password', message='Passwords do not match.')])
class CreateTest(FlaskForm):
start_date = DateField('Start Date', format="%Y-%m-%d", validators=[InputRequired()], default = date.today() )
expiry_date = DateField('Expiry Date', format="%Y-%m-%d", validators=[InputRequired()], default = date.today() + timedelta(days=1) )
time_limit = SelectField('Time Limit')
dataset = SelectField('Question Dataset')
class UploadData(FlaskForm):
data_file = FileField('Data File', validators=[FileRequired(), FileAllowed(['json'])])
default = BooleanField('Make Default', render_kw={'checked': True})
class AddTimeAdjustment(FlaskForm):
time = IntegerField('Extra Time (Minutes)', validators=[InputRequired(), value(max=60)])

View File

View File

@ -0,0 +1,3 @@
from .entry import Entry
from .test import Test
from .user import User

View File

@ -1,4 +1,13 @@
from main import db from ..modules import db
from ..tools.forms import JsonEncodedDict
from ..tools.encryption import decrypt, encrypt
from ..tools.logs import write
from .test import Test
from flask import jsonify
from flask_login import current_user
from datetime import datetime, timedelta
class Entry(db.Model): class Entry(db.Model):
@ -8,13 +17,14 @@ class Entry(db.Model):
email = db.Column(db.String(128), nullable=False) email = db.Column(db.String(128), nullable=False)
club = db.Column(db.String(128), nullable=True) club = db.Column(db.String(128), nullable=True)
test_id = db.Column(db.String(36), db.ForeignKey('test.id')) test_id = db.Column(db.String(36), db.ForeignKey('test.id'))
test_code = db.Column(db.String(36), db.ForeignKey('test.test_code')) test_code = db.Column(db.String(36), db.ForeignKey('test.code'))
user_code = db.Column(db.String(6), nullable=True) user_code = db.Column(db.String(6), nullable=True)
start_time = db.Column(db.DateTime, nullable=False) start_time = db.Column(db.DateTime, nullable=False)
end_time = db.Column(db.DateTime, nullable=True) end_time = db.Column(db.DateTime, nullable=True)
status = db.Column(db.String(16), nullable=True) status = db.Column(db.String(16), nullable=True)
late_ignore = db.Column(db.Boolean, default=False, nullable=True) valid = db.Column(db.Boolean, default=True, nullable=True)
answers = db.Column(JsonEncodedDict, nullable=True) answers = db.Column(JsonEncodedDict, nullable=True)
result = db.Column(JsonEncodedDict, nullable=True)
@property @property
def set_first_name(self): raise AttributeError('set_first_name is not a readable attribute.') def set_first_name(self): raise AttributeError('set_first_name is not a readable attribute.')
@ -35,7 +45,7 @@ class Entry(db.Model):
@property @property
def set_email(self): raise AttributeError('set_email is not a readable attribute.') def set_email(self): raise AttributeError('set_email is not a readable attribute.')
set_name.setter set_email.setter
def set_email(self, email:str): self.email = encrypt(email) def set_email(self, email:str): self.email = encrypt(email)
def get_email(self): return decrypt(self.email) def get_email(self): return decrypt(self.email)
@ -43,7 +53,35 @@ class Entry(db.Model):
@property @property
def set_club(self): raise AttributeError('set_club is not a readable attribute.') def set_club(self): raise AttributeError('set_club is not a readable attribute.')
set_name.setter set_club.setter
def set_club(self, club:str): self.club = encrypt(club) def set_club(self, club:str): self.club = encrypt(club)
def get_club(self): return decrypt(self.club) def get_club(self): return decrypt(self.club)
def start(self):
self.start_time = datetime.now()
self.status = 'started'
write('tests.log', f'New test started by {self.get_first_name()} {self.get_surname()}.')
db.session.commit()
def complete(self):
self.end_time = datetime.now()
write('tests.log', f'Test completed by {self.get_first_name()} {self.get_surname()}.')
test = Test.query.filter_by(code=self.test_code).first()
delta = timedelta(minutes=test.time_limit)
if not test.time_limit or self.end_time <= self.start_time + delta:
self.status = 'finished'
self.valid = True
else:
self.status = 'late'
self.valid = False
db.session.commit()
def validate(self):
if self.valid: return False, jsonify({'error':f'The entry is already valid.'})
if self.status == 'started': return False, jsonify({'error':f'The entry is still pending.'})
self.valid = True
self.status = 'completed'
db.session.commit()
message = f'The entry {self.id} has been validated by {current_user.get_username()}.'
return True, jsonify({'success': message})

View File

@ -1,112 +1,91 @@
class Test: from ..modules import db
from ..tools.encryption import decrypt, encrypt
def __init__(self, _id=None, start_date=None, expiry_date=None, time_limit=None, creator=None, dataset=None): from ..tools.forms import JsonEncodedDict
self._id = _id from ..tools.logs import write
self.start_date = start_date
self.expiry_date = expiry_date from flask import jsonify
self.time_limit = None if time_limit == 'none' or time_limit == '' or time_limit == None else int(time_limit) from flask.helpers import flash
self.creator = creator from flask_login import current_user
self.dataset = dataset
from datetime import datetime
from json import dump, loads
import os
import secrets
class Test(db.Model):
id = db.Column(db.String(36), primary_key=True)
code = db.Column(db.String(36), nullable=False)
start_date = db.Column(db.DateTime, nullable=True)
end_date = db.Column(db.DateTime, nullable=True)
time_limit = db.Column(db.Integer, nullable=True)
creator = db.Column(db.String(36), db.ForeignKey('user.id'))
data = db.Column(db.String(36), nullable=False)
adjustments = db.Column(JsonEncodedDict, nullable=True)
def __repr__(self):
return f'<test with code {self.code} was created by {current_user.get_username()}.>'
def get_code(self):
code = self.code.upper()
return ''.join([code[:4], code[4:8], code[8:]])
def create(self): def create(self):
from main import app, db db.session.add(self)
test = { db.session.commit()
'_id': self._id, write('system.log', f'Test with code {self.code} created by {current_user.get_username()}.')
'date_created': datetime.today(),
'start_date': self.start_date,
'expiry_date': self.expiry_date,
'time_limit': self.time_limit,
'creator': encrypt(self.creator),
'test_code': secrets.token_hex(6).upper(),
'dataset': self.dataset
}
if db.tests.insert_one(test):
dataset_file_path = os.path.join(app.config["DATA_FILE_DIRECTORY"],self.dataset)
with open(dataset_file_path, 'r') as dataset_file:
data = loads(dataset_file.read())
data['meta']['tests'].append(self._id)
with open(dataset_file_path, 'w') as dataset_file:
dump(data, dataset_file, indent=2)
flash(f'Created a new exam with Exam Code <strong>{self.render_test_code(test["test_code"])}</strong>.', 'success')
return jsonify({'success': test}), 200
return jsonify({'error': f'Could not create exam. An error occurred.'}), 400
def add_time_adjustment(self, time_adjustment):
from main import db
user_code = secrets.token_hex(3).upper()
adjustment = {
user_code: time_adjustment
}
if db.tests.find_one_and_update({'_id': self._id}, {'$set': {'time_adjustments': adjustment}},upsert=False):
flash(f'Time adjustment for {time_adjustment} minutes has been added. This can be enabled using the user code {user_code}.')
return jsonify({'success': adjustment})
return jsonify({'error': 'Failed to add the time adjustment. An error occurred.'}), 400
def remove_time_adjustment(self, user_code):
from main import db
if db.tests.find_one_and_update({'_id': self._id}, {'$unset': {f'time_adjustments.{user_code}': {}}}):
message = 'Time adjustment has been deleted.'
flash(message, 'success')
return jsonify({'success': message})
return jsonify({'error': 'Failed to delete the time adjustment. An error occurred.'}), 400
def render_test_code(self, test_code):
return ''.join([test_code[:4], test_code[4:8], test_code[8:]])
def parse_test_code(self, test_code):
return test_code.replace('', '')
def delete(self): def delete(self):
from main import app, db code = self.code
test = db.tests.find_one({'_id': self._id}) db.session.delete(self)
if 'entries' in test: db.session.commit()
if test['entries']: write('system.log', f'Test with code {code} deleted by {current_user.get_username()}.')
return jsonify({'error': 'Cannot delete an exam that has entries submitted to it.'}), 400
if self.dataset is None: def start(self):
self.dataset = db.tests.find_one({'_id': self._id})['dataset'] now = datetime.now()
if db.tests.delete_one({'_id': self._id}): if self.start_date > now:
dataset_file_path = os.path.join(app.config["DATA_FILE_DIRECTORY"],self.dataset) self.start_date = now
with open(dataset_file_path, 'r') as dataset_file: db.session.commit()
data = loads(dataset_file.read()) message = f'Test with code {self.code} started by {current_user.get_username()}.'
data['meta']['tests'].remove(self._id) write('system.log', message)
with open(dataset_file_path, 'w') as dataset_file: return True, jsonify({'success': message})
dump(data, dataset_file, indent=2) return False, jsonify({'error': f'Test with code {self.code} has already started.'})
message = 'Deleted exam.'
flash(message, 'alert')
return jsonify({'success': message}), 200
return jsonify({'error': f'Could not create exam. An error occurred.'}), 400
def update(self): def end(self):
from main import db now = datetime.now()
test = {} if self.end_date > now:
updated = [] self.end_date = now
if not self.start_date == '' and self.start_date is not None: db.session.commit()
test['start_date'] = self.start_date message = f'Test with code {self.code} ended by {current_user.get_username()}.'
updated.append('start date') write('system.log', message)
if not self.expiry_date == '' and self.expiry_date is not None: return True, jsonify({'success': message})
test['expiry_date'] = self.expiry_date return False, jsonify({'error': f'Test with code {self.code} has already started.'})
updated.append('expiry date')
if not self.time_limit == '' and self.time_limit is not None: def add_adjustment(self, time:int):
test['time_limit'] = int(self.time_limit) adjustments = self.adjustments if self.adjustments is not None else {}
updated.append('time limit') code = secrets.token_hex(3).lower()
output = '' adjustments[code] = time
if len(updated) == 0: self.adjustments = adjustments
flash(f'There were no changes requested for your account.', 'alert'), 200 db.session.commit()
return jsonify({'success': 'There were no changes requested for your account.'}), 200 write('system.log', f'Time adjustment for {time} minutes with code {code} added to test {self.get_code()} by {current_user.get_username()}.')
elif len(updated) == 1: return True, jsonify({'success': f'Time adjustment for {time} minutes added to test {self.get_code()}. This can be accessed using the user code {code.upper()}.'})
output = updated[0]
elif len(updated) == 2: def remove_adjustment(self, code:str):
output = ' and '.join(updated) if not self.adjustments: return False, jsonify({'error': f'There are no adjustments configured for test {self.get_code()}.'})
elif len(updated) > 2: self.adjustments.pop(code)
output = updated[0] if not self.adjustments: self.adjustments = None
for index in range(1,len(updated)): db.session.commit()
if index < len(updated) - 2: message = f'Time adjustment for with code {code} removed from test {self.get_code()} by {current_user.get_username()}.'
output = ', '.join([output, updated[index]]) write('system.log', message)
elif index == len(updated) - 2: return True, jsonify({'success': message})
output = ', and '.join([output, updated[index]])
else: def update(self, start_date:datetime=None, end_date:datetime=None, time_limit:int=None):
output = ''.join([output, updated[index]]) if not start_date and not end_date and time_limit is None: return False, jsonify({'error': 'There were no changes requested.'})
db.tests.find_one_and_update({'_id': self._id}, {'$set': test}) if start_date: self.start_date = start_date
_output = f'The {output} of the test {"has" if len(updated) == 1 else "have"} been updated.' if end_date: self.end_date = end_date
flash(_output) if time_limit is not None: self.time_limit = time_limit
return jsonify({'success': _output}), 200 db.session.commit()
message = f'Test with code {self.get_code()} has been updated by user {current_user.get_username()}'
write('system.log', message)
return True, jsonify({'success': message})

View File

@ -2,13 +2,13 @@ from ..modules import db
from ..tools.encryption import decrypt, encrypt from ..tools.encryption import decrypt, encrypt
from ..tools.logs import write from ..tools.logs import write
import secrets
from flask import flash, jsonify, session from flask import flash, jsonify, session
from flask.helpers import url_for from flask.helpers import url_for
from flask_login import UserMixin, login_user, logout_user from flask_login import current_user, login_user, logout_user, UserMixin
from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.security import check_password_hash, generate_password_hash
import secrets
from uuid import uuid4
class User(UserMixin, db.Model): class User(UserMixin, db.Model):
id = db.Column(db.String(36), primary_key=True) id = db.Column(db.String(36), primary_key=True)
username = db.Column(db.String(128), nullable=False) username = db.Column(db.String(128), nullable=False)
@ -20,6 +20,12 @@ class User(UserMixin, db.Model):
def __repr__(self): def __repr__(self):
return f'<user {self.username}> was added with <id {self.id}>.' return f'<user {self.username}> was added with <id {self.id}>.'
@property
def generate_id(self): raise AttributeError('generate_id is not a readable attribute.')
generate_id.setter
def generate_id(self): self.id = uuid4.hex()
@property @property
def set_username(self): raise AttributeError('set_username is not a readable attribute.') def set_username(self): raise AttributeError('set_username is not a readable attribute.')
@ -57,17 +63,11 @@ class User(UserMixin, db.Model):
return True, f'User {self.get_username()} was created successfully.' return True, f'User {self.get_username()} was created successfully.'
def login(self, remember:bool=False): def login(self, remember:bool=False):
self.authenticated = True
db.session.add(self)
db.session.commit()
login_user(self, remember = remember) login_user(self, remember = remember)
write('users.log', f'User \'{self.get_username()}\' has logged in.') write('users.log', f'User \'{self.get_username()}\' has logged in.')
flash(message=f'Welcome {self.get_username()}', category='success') flash(message=f'Welcome {self.get_username()}', category='success')
def logout(self): def logout(self):
self.authenticated = False
db.session.add(self)
db.session.commit()
session['remembered_username'] = self.get_username() session['remembered_username'] = self.get_username()
logout_user() logout_user()
write('users.log', f'User \'{self.get_username()}\' has logged out.') write('users.log', f'User \'{self.get_username()}\' has logged out.')
@ -93,4 +93,13 @@ class User(UserMixin, db.Model):
username = self.get_username() username = self.get_username()
db.session.delete(self) db.session.delete(self)
db.session.commit() db.session.commit()
write('users.log', f'User \'{username}\' was deleted.') # TODO add current user write('users.log', f'User \'{username}\' was deleted by \'{current_user.get_username()}\'.')
def update(self, password:str=None, email:str=None):
if not password and not email: return False, jsonify({'error': 'There were no changes requested.'})
if password: self.set_password(password)
if email: self.set_email(email)
db.session.commit()
message = f'Information for user {self.get_username()} has been updated by {current_user.get_username()}.'
write('system.log', message)
return True, jsonify({'success': message})

View File

@ -0,0 +1,22 @@
from .data import load
from ..models import User
from flask import abort, redirect
from flask.helpers import url_for
from flask_login import current_user
from functools import wraps
def require_account_creation(function):
@wraps(function)
def wrapper(*args, **kwargs):
if User.query.count() == 0: return redirect(url_for('views._register'))
return function(*args, **kwargs)
return wrapper
def disable_if_logged_in(function):
@wraps(function)
def wrapper(*args, **kwargs):
if current_user.is_authenticated: return abort(404)
return function(*args, **kwargs)
return wrapper

View File

@ -1,4 +1,3 @@
from main import Config
from ..data import data as data_dir from ..data import data as data_dir
import json import json

View File

@ -0,0 +1,36 @@
from ..modules import db
from wtforms.validators import ValidationError
import json
from sqlalchemy.ext import mutable
class JsonEncodedDict(db.TypeDecorator):
"""Enables JSON storage by encoding and decoding on the fly."""
impl = db.Text
def process_bind_param(self, value, dialect):
if value is None:
return '{}'
else:
return json.dumps(value)
def process_result_value(self, value, dialect):
if value is None:
return {}
else:
return json.loads(value)
mutable.MutableDict.associate_with(JsonEncodedDict)
def value(min:int=0, max:int=None):
if not max:
message = f'Value must be greater than {min}.'
else:
message = f'Value must be between {min} and {max}.'
def length(form, field):
value = field.data or 0
if value < min or max != None and value > max:
raise ValidationError(message)
return length

View File

@ -1,4 +1,3 @@
from main import Config
from ..data import data from ..data import data
from datetime import datetime from datetime import datetime

View File

@ -2,7 +2,9 @@ blinker==1.4
cffi==1.15.0 cffi==1.15.0
click==8.1.3 click==8.1.3
cryptography==37.0.2 cryptography==37.0.2
dnspython==2.2.1
dominate==2.6.0 dominate==2.6.0
email-validator==1.2.1
Flask==2.1.2 Flask==2.1.2
Flask-Bootstrap==3.3.7.1 Flask-Bootstrap==3.3.7.1
Flask-Login==0.6.1 Flask-Login==0.6.1
@ -10,6 +12,7 @@ Flask-Mail==0.9.1
Flask-SQLAlchemy==2.5.1 Flask-SQLAlchemy==2.5.1
Flask-WTF==1.0.1 Flask-WTF==1.0.1
greenlet==1.1.2 greenlet==1.1.2
idna==3.3
itsdangerous==2.1.2 itsdangerous==2.1.2
Jinja2==3.1.2 Jinja2==3.1.2
MarkupSafe==2.1.1 MarkupSafe==2.1.1