Finished data upload

Refactored to move security package inside common
Moved data folder to process root.
This commit is contained in:
2021-11-28 02:30:46 +00:00
parent 0fff71b1fb
commit 53cc25b4ce
17 changed files with 214 additions and 42 deletions

View File

@ -1,18 +0,0 @@
from datetime import datetime, timedelta
from flask import Blueprint, redirect, request
cookie_consent = Blueprint(
'cookie_consent',
__name__
)
@cookie_consent.route('/')
def _cookies():
resp = redirect('/')
resp.set_cookie(
key = 'cookie_consent',
value = 'True',
max_age = timedelta(days=14) if request.cookies.get('remember') == 'True' else 'Session',
path = '/',
expires = datetime.utcnow() + timedelta(days=14) if request.cookies.get('remember') else 'Session'
)
return resp

View File

@ -0,0 +1,18 @@
from datetime import datetime, timedelta
from flask import Blueprint, redirect, request
cookie_consent = Blueprint(
'cookie_consent',
__name__
)
@cookie_consent.route('/')
def _cookies():
resp = redirect('/')
resp.set_cookie(
key = 'cookie_consent',
value = 'True',
max_age = timedelta(days=14) if request.cookies.get('remember') == 'True' else 'Session',
path = '/',
expires = datetime.utcnow() + timedelta(days=14) if request.cookies.get('remember') else 'Session'
)
return resp

View File

@ -0,0 +1,59 @@
import os
from shutil import rmtree
import pathlib
from json import dump, loads
from datetime import datetime
from main import app
from werkzeug.utils import secure_filename
def check_data_folder_exists():
if not os.path.exists(app.config['DATA_FILE_DIRECTORY']):
pathlib.Path(app.config['DATA_FILE_DIRECTORY']).mkdir(parents='True', exist_ok='True')
def check_current_indicator():
if not os.path.isfile(os.path.join(app.config['DATA_FILE_DIRECTORY'], '.current.txt')):
open(os.path.join(app.config['DATA_FILE_DIRECTORY'], '.current.txt'),'w').close()
def make_temp_dir(file):
if not os.path.isdir('tmp'):
os.mkdir('tmp')
if os.path.isfile(f'tmp/{file.filename}'):
os.remove(f'tmp/{file.filename}')
file.save(f'tmp/{file.filename}')
def check_json_format(file):
if not '.' in file.filename:
return False
if not file.filename.rsplit('.', 1)[-1] == 'json':
return False
return True
def validate_json_contents(file):
file.stream.seek(0)
data = loads(file.read())
if not type(data) is dict:
return False
elif not all( key in data for key in ['meta', 'questions']):
return False
elif not type(data['meta']) is dict:
return False
elif not type(data['questions']) is list:
return False
return True
def store_data_file(file):
from admin.views import get_id_from_cookie
check_current_indicator()
timestamp = datetime.utcnow()
filename = '.'.join([timestamp.strftime('%Y%m%d%H%M%S'),'json'])
filename = secure_filename(filename)
file_path = os.path.join(app.config['DATA_FILE_DIRECTORY'], filename)
file.stream.seek(0)
data = loads(file.read())
data['meta']['timestamp'] = timestamp.strftime('%Y-%m-%d %H%M%S')
data['meta']['author'] = get_id_from_cookie()
with open(file_path, 'w') as _file:
dump(data, _file, indent=4)
with open(os.path.join(app.config['DATA_FILE_DIRECTORY'], '.current.txt'), 'w') as _file:
_file.write(filename)

View File

@ -0,0 +1,49 @@
from os import environ, path
from cryptography.fernet import Fernet
def generate_keyfile():
with open('./common/security/.encryption.key', 'wb') as keyfile:
key = Fernet.generate_key()
keyfile.write(key)
def load_key():
with open('./common/security/.encryption.key', 'rb') as keyfile:
key = keyfile.read()
return key
def check_keyfile_exists():
return path.isfile('./common/security/.encryption.key')
def encrypt(input):
if not check_keyfile_exists():
generate_keyfile()
_encryption_key = load_key()
fernet = Fernet(_encryption_key)
if type(input) == str:
input = input.encode()
output = fernet.encrypt(input)
return output.decode()
if type(input) == dict:
output = {}
for key,value in input.items():
if type(value) == dict:
output[key] = encrypt(value)
else:
value = value.encode()
output[key] = fernet.encrypt(value)
output[key] = output[key].decode()
return output
def decrypt(input):
if not check_keyfile_exists():
raise EncryptionKeyMissing
input = input.encode()
_encryption_key = load_key()
fernet = Fernet(_encryption_key)
output = fernet.decrypt(input)
return output.decode()
class EncryptionKeyMissing(Exception):
def __init__(self, message='There is no encryption keyfile.'):
self.message = message
super().__init__(self.message)

View File

@ -0,0 +1,46 @@
from pymongo import collection
from . import encrypt, decrypt
encrypted_parameters = ['username', 'email', 'name', 'club', 'creator']
def decrypt_find(collection:collection, query:dict):
cursor = collection.find({})
output_list = []
for document in cursor:
decrypted_document = {}
for key in document:
if key not in encrypted_parameters:
decrypted_document[key] = document[key]
else:
decrypted_document[key] = decrypt(document[key])
if not query:
output_list.append(decrypted_document)
else:
if set(query.items()).issubset(set(decrypted_document.items())):
output_list.append(decrypted_document)
return output_list
def decrypt_find_one(collection:collection, query:dict={}):
cursor = decrypt_find(collection=collection, query=query)
if cursor: return cursor[0]
return None
def encrypted_update(collection:collection, query:dict={}, update:dict={}):
document = decrypt_find_one(collection=collection, query=query)
for update_action in update:
key_pairs = update[update_action]
if type(key_pairs) is not dict:
raise ValueError
if update_action == '$set':
for key in key_pairs:
if key == '_id':
raise ValueError
document[key] = key_pairs[key]
if update_action == '$unset':
for key in key_pairs:
if key == '_id':
raise ValueError
if key in document:
del document[key]
for key in document:
document[key] = encrypt(document[key]) if key in encrypted_parameters else document[key]
return collection.find_one_and_replace( { '_id': document['_id'] }, document)