Finished data upload
Refactored to move security package inside common Moved data folder to process root.
This commit is contained in:
@ -1,18 +0,0 @@
|
||||
from datetime import datetime, timedelta
|
||||
from flask import Blueprint, redirect, request
|
||||
|
||||
cookie_consent = Blueprint(
|
||||
'cookie_consent',
|
||||
__name__
|
||||
)
|
||||
@cookie_consent.route('/')
|
||||
def _cookies():
|
||||
resp = redirect('/')
|
||||
resp.set_cookie(
|
||||
key = 'cookie_consent',
|
||||
value = 'True',
|
||||
max_age = timedelta(days=14) if request.cookies.get('remember') == 'True' else 'Session',
|
||||
path = '/',
|
||||
expires = datetime.utcnow() + timedelta(days=14) if request.cookies.get('remember') else 'Session'
|
||||
)
|
||||
return resp
|
59
ref-test/common/data_tools.py
Normal file
59
ref-test/common/data_tools.py
Normal file
@ -0,0 +1,59 @@
|
||||
import os
|
||||
from shutil import rmtree
|
||||
import pathlib
|
||||
from json import dump, loads
|
||||
from datetime import datetime
|
||||
from main import app
|
||||
|
||||
from werkzeug.utils import secure_filename
|
||||
|
||||
def check_data_folder_exists():
|
||||
if not os.path.exists(app.config['DATA_FILE_DIRECTORY']):
|
||||
pathlib.Path(app.config['DATA_FILE_DIRECTORY']).mkdir(parents='True', exist_ok='True')
|
||||
|
||||
def check_current_indicator():
|
||||
if not os.path.isfile(os.path.join(app.config['DATA_FILE_DIRECTORY'], '.current.txt')):
|
||||
open(os.path.join(app.config['DATA_FILE_DIRECTORY'], '.current.txt'),'w').close()
|
||||
|
||||
def make_temp_dir(file):
|
||||
if not os.path.isdir('tmp'):
|
||||
os.mkdir('tmp')
|
||||
if os.path.isfile(f'tmp/{file.filename}'):
|
||||
os.remove(f'tmp/{file.filename}')
|
||||
file.save(f'tmp/{file.filename}')
|
||||
|
||||
def check_json_format(file):
|
||||
if not '.' in file.filename:
|
||||
return False
|
||||
if not file.filename.rsplit('.', 1)[-1] == 'json':
|
||||
return False
|
||||
return True
|
||||
|
||||
def validate_json_contents(file):
|
||||
file.stream.seek(0)
|
||||
data = loads(file.read())
|
||||
if not type(data) is dict:
|
||||
return False
|
||||
elif not all( key in data for key in ['meta', 'questions']):
|
||||
return False
|
||||
elif not type(data['meta']) is dict:
|
||||
return False
|
||||
elif not type(data['questions']) is list:
|
||||
return False
|
||||
return True
|
||||
|
||||
def store_data_file(file):
|
||||
from admin.views import get_id_from_cookie
|
||||
check_current_indicator()
|
||||
timestamp = datetime.utcnow()
|
||||
filename = '.'.join([timestamp.strftime('%Y%m%d%H%M%S'),'json'])
|
||||
filename = secure_filename(filename)
|
||||
file_path = os.path.join(app.config['DATA_FILE_DIRECTORY'], filename)
|
||||
file.stream.seek(0)
|
||||
data = loads(file.read())
|
||||
data['meta']['timestamp'] = timestamp.strftime('%Y-%m-%d %H%M%S')
|
||||
data['meta']['author'] = get_id_from_cookie()
|
||||
with open(file_path, 'w') as _file:
|
||||
dump(data, _file, indent=4)
|
||||
with open(os.path.join(app.config['DATA_FILE_DIRECTORY'], '.current.txt'), 'w') as _file:
|
||||
_file.write(filename)
|
49
ref-test/common/security/__init__.py
Normal file
49
ref-test/common/security/__init__.py
Normal file
@ -0,0 +1,49 @@
|
||||
from os import environ, path
|
||||
from cryptography.fernet import Fernet
|
||||
|
||||
def generate_keyfile():
|
||||
with open('./common/security/.encryption.key', 'wb') as keyfile:
|
||||
key = Fernet.generate_key()
|
||||
keyfile.write(key)
|
||||
|
||||
def load_key():
|
||||
with open('./common/security/.encryption.key', 'rb') as keyfile:
|
||||
key = keyfile.read()
|
||||
return key
|
||||
|
||||
def check_keyfile_exists():
|
||||
return path.isfile('./common/security/.encryption.key')
|
||||
|
||||
def encrypt(input):
|
||||
if not check_keyfile_exists():
|
||||
generate_keyfile()
|
||||
_encryption_key = load_key()
|
||||
fernet = Fernet(_encryption_key)
|
||||
if type(input) == str:
|
||||
input = input.encode()
|
||||
output = fernet.encrypt(input)
|
||||
return output.decode()
|
||||
if type(input) == dict:
|
||||
output = {}
|
||||
for key,value in input.items():
|
||||
if type(value) == dict:
|
||||
output[key] = encrypt(value)
|
||||
else:
|
||||
value = value.encode()
|
||||
output[key] = fernet.encrypt(value)
|
||||
output[key] = output[key].decode()
|
||||
return output
|
||||
|
||||
def decrypt(input):
|
||||
if not check_keyfile_exists():
|
||||
raise EncryptionKeyMissing
|
||||
input = input.encode()
|
||||
_encryption_key = load_key()
|
||||
fernet = Fernet(_encryption_key)
|
||||
output = fernet.decrypt(input)
|
||||
return output.decode()
|
||||
|
||||
class EncryptionKeyMissing(Exception):
|
||||
def __init__(self, message='There is no encryption keyfile.'):
|
||||
self.message = message
|
||||
super().__init__(self.message)
|
46
ref-test/common/security/database.py
Normal file
46
ref-test/common/security/database.py
Normal file
@ -0,0 +1,46 @@
|
||||
from pymongo import collection
|
||||
from . import encrypt, decrypt
|
||||
encrypted_parameters = ['username', 'email', 'name', 'club', 'creator']
|
||||
|
||||
def decrypt_find(collection:collection, query:dict):
|
||||
cursor = collection.find({})
|
||||
output_list = []
|
||||
for document in cursor:
|
||||
decrypted_document = {}
|
||||
for key in document:
|
||||
if key not in encrypted_parameters:
|
||||
decrypted_document[key] = document[key]
|
||||
else:
|
||||
decrypted_document[key] = decrypt(document[key])
|
||||
if not query:
|
||||
output_list.append(decrypted_document)
|
||||
else:
|
||||
if set(query.items()).issubset(set(decrypted_document.items())):
|
||||
output_list.append(decrypted_document)
|
||||
return output_list
|
||||
|
||||
def decrypt_find_one(collection:collection, query:dict={}):
|
||||
cursor = decrypt_find(collection=collection, query=query)
|
||||
if cursor: return cursor[0]
|
||||
return None
|
||||
|
||||
def encrypted_update(collection:collection, query:dict={}, update:dict={}):
|
||||
document = decrypt_find_one(collection=collection, query=query)
|
||||
for update_action in update:
|
||||
key_pairs = update[update_action]
|
||||
if type(key_pairs) is not dict:
|
||||
raise ValueError
|
||||
if update_action == '$set':
|
||||
for key in key_pairs:
|
||||
if key == '_id':
|
||||
raise ValueError
|
||||
document[key] = key_pairs[key]
|
||||
if update_action == '$unset':
|
||||
for key in key_pairs:
|
||||
if key == '_id':
|
||||
raise ValueError
|
||||
if key in document:
|
||||
del document[key]
|
||||
for key in document:
|
||||
document[key] = encrypt(document[key]) if key in encrypted_parameters else document[key]
|
||||
return collection.find_one_and_replace( { '_id': document['_id'] }, document)
|
Reference in New Issue
Block a user