You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
autograders/mongodb/source/autograder.py

185 lines
6.0 KiB

import pymongo, json, re
from bson import json_util
print('ver: 1.5')
# dbprep
fsroot = '/autograder/source/'
datasets = ['movies', 'comments']
db = pymongo.MongoClient('mongodb://127.0.0.1')['test']
points = (4,4,5,5,8,8,8,8)
def postproc_str(data : str): # relaxed str matching
return re.sub(r'[\s|_|.]', '', data.lower())
def postproc_keys(data : str):
if type(data) is not str:
return postproc_iter(data)
_dict = {}
data = postproc_str(data)
for k, v in _dict.items():
data = re.sub(k, v, data)
return data
def comparator(a, b):
cmp = lambda x, y: 1 if x < y else -1 if x > y else 0
try:
return cmp(a, b)
except Exception:
from collections.abc import Iterable
def itcmp(a: Iterable, b: Iterable):
if len(a) < len(b):
return -1
elif len(a) == len(b):
for aa, bb in zip(a, b):
c = comparator(aa, bb)
if c != 0:
return c
else: return 1
return 0
from bson import ObjectId
match (a, b):
case (dict(), dict()):
return itcmp([*a.keys(), *a.values()], [*b.keys(), *b.values()])
case (ObjectId(aa), ObjectId(bb)):
return cmp(aa, bb)
case (Iterable(), Iterable()):
return itcmp(a, b)
case _ if type(a) == type(b):
return cmp(f'{a}', f'{b}')
case _:
return cmp(hash(type(a)), hash(type(b)))
def postproc_iter(data, naive = False):
from collections.abc import Iterable
from functools import cmp_to_key
try:
match data:
case str():
return postproc_str(data)
case dict():
return {
(i if naive else postproc_keys(k)) : postproc_iter(v, naive)
for i, (k, v) in enumerate(data.items())
}
case Iterable(): # flatten, remove order and empty iterables
res = type(data)(
sorted(
[postproc_iter(d, naive) for d in data
if not isinstance(d, Iterable) or d]
, key = cmp_to_key(comparator))
)
return res[0] if len(res) == 1 else res
case _: # primitives
return data
except Exception as e: # fail proof
print(e)
return data
def evaluate(query : str):
if type(query) is not str:
return tuple(evaluate(q) for q in query)
query = re.sub(r'//[^\n]*', '', query)
query = re.sub(r'(\$?[\d\w_]+)[\s\r\n]*:', r'"\1" :', query)
query = re.sub(r'[\r\n]|.\s*pretty\s*\(\s*\)|.\s*sort\s*\([^\)]*\)', '', query).strip()
if not query: return [None] * 2
query = re.sub(r'.\s*aggregate\s*\(\s*([^\[^\s][^\)]*)\)', r'.aggregate([\1])', query)
if query.endswith(';'): query = query[:-1]
true = True
data = list(eval(query))
return [postproc_iter(data, n) if query else None for n in (True, False)]
for d in datasets:
with open(fsroot + d + '.json', encoding = 'utf-8') as f:
# ds = re.sub(r'{\s*"\$oid"\s*:\s*("\w*")\s*}', r'ObjectId(\1)', f.read())
jsonds = '[' + f.read().strip().replace('\n', ',') + ']'
db[d].insert_many(json.loads(jsonds, object_hook=json_util.object_hook))
from solution import sols
answers = [evaluate(s) for s in sols]
# grading
from os import listdir
from importlib.util import module_from_spec, spec_from_file_location
subroot = '/autograder/submission/'
feedback = ''
submissions = [subroot + f for f in listdir(subroot) if f.strip().lower().endswith('.py')]
grade = 0
n_queries = len(sols)
def grade78(ans, i):
sol = answers[i]
others = ('otherbill', 'otherperson')[i - 6]
if type(ans) != list or len(ans) != len(sol):
return False
for a in ans:
if a not in sol:
if type(a) is dict:
try:
for ak in a.keys():
if ak.startswith(others):
key_others = ak[len(others):]
t = a[key_others]
a[key_others] = a[ak]
a[ak] = t
if a not in sol:
return False
else:
sol.remove(a)
except Exception as e:
print(e)
return False
else:
return False
return True
wa = rte = False
if submissions:
submission = submissions[0]
for i in range(n_queries):
feedback += f'Query {i + 1}: '
try:
spec = spec_from_file_location('curr', submission)
module = module_from_spec(spec)
spec.loader.exec_module(module)
q = getattr(module, f'query{i + 1}')()
ans = evaluate(q)
def eq(i):
_cmp = lambda a, b: any(aa == bb for aa, bb in zip(a, b))
if type(answers[i]) is tuple:
return any(_cmp(ans, aa) for aa in answers[i])
else:
return _cmp(ans, answers[i])
if eq(i):
grade += points[i]
feedback += 'Correct.\n'
else:
feedback += 'Wrong Answer.\n'
wa = True
print('ans: ', ans, '\nsol: ', answers[i])
except Exception as e:
rte = True
feedback += 'Runtime Error.\n'
print (e)
else:
feedback += 'No python file in submission.\n'
max_score = sum(points)
if rte:
feedback += ('\nPlease check for syntax errors first if you encountered runtime errors.\n' +
'If you believe it\'s a mistake, contact the TA at sun1226@purdue.edu for assistance.')
# output
results = {
'output': feedback,
'score': grade,
'max_score': max_score,
}
with open('/autograder/results/results.json', 'w') as res:
json.dump(results, res)