import pymongo, json
print('ver: 1.5')
# dbprep
fsroot = '/autograder/source/'
datasets = ['congress', 'bills']
db = pymongo.MongoClient('mongodb://')['test']
def postproc_str(data : str): # relaxed str matching
import re
data = re.sub(r'[\s|_|.]', '', data.lower())
return re.sub(r'sponser', 'sponsor', data)
def comparator(a, b):
cmp = lambda x, y: 1 if x < y else -1 if x > y else 0
return cmp(a, b)
except Exception:
from collections.abc import Iterable
def itcmp(a: Iterable, b: Iterable):
if len(a) < len(b):
return -1
elif len(a) == len(b):
for aa, bb in zip(a, b):
c = comparator(aa, bb)
if c != 0:
return c
else: return 1
return 0
from bson import ObjectId
match (a, b):
case (dict(), dict()):
return itcmp([*a.keys(), *a.values()], [*b.keys(), *b.values()])
case (ObjectId(aa), ObjectId(bb)):
return cmp(aa, bb)
case (Iterable(), Iterable()):
return itcmp(a, b)
case _ if type(a) == type(b):
return cmp(f'{a}', f'{b}')
case _:
return cmp(hash(type(a)), hash(type(b)))
def postproc_iter(data):
from collections.abc import Iterable
from functools import cmp_to_key
match data:
case str():
return postproc_str(data)
case dict():
return { postproc_iter(k):postproc_iter(v) for k, v in data.items() }
case Iterable(): # flatten, remove order and empty iterables
res = type(data)(
[postproc_iter(d) for d in data
if not isinstance(d, Iterable) or d]
, key = cmp_to_key(comparator))
return res[0] if len(res) == 1 else res
case _: # primitives
return data
except Exception as e: # fail proof
return data
def evaluate(query : str):
import re
query = re.sub(r'//[^\n]*', '', query)
query = re.sub(r'(\$?[\d\w_]+)[\s\r\n]*:', r'"\1" :', query)
query = re.sub(r'[\r\n]|.\s*pretty\s*\(\s*\)|.\s*sort\s*\([^\)]*\)', '', query).strip()
query = re.sub(r'.\s*aggregate\s*\(\s*([^\[^\s][^\)]*)\)', r'.aggregate([\1])', query)
if query.endswith(';'): query = query[:-1]
true = True
return postproc_iter(list(eval(query))) if query else None
for d in datasets:
with open(fsroot + d + '.json', encoding = 'utf-8') as f:
from solution import sols
answers = [evaluate(s) for s in sols]
# grading
from os import listdir
from importlib.util import module_from_spec, spec_from_file_location
subroot = '/autograder/submission/'
feedback = ''
submissions = [subroot + f for f in listdir(subroot) if f.strip().lower().endswith('.py')]
grade = 0
n_queries = len(sols)
def grade78(ans, i):
sol = answers[i]
others = ('otherbill', 'otherperson')[i - 6]
if type(ans) != list or len(ans) != len(sol):
return False
for a in ans:
if a not in sol:
if type(a) is dict:
for ak in a.keys():
if ak.startswith(others):
key_others = ak[len(others):]
t = a[key_others]
a[key_others] = a[ak]
a[ak] = t
if a not in sol:
return False
except Exception as e:
return False
return False
return True
if submissions:
submission = submissions[0]
for i in range(n_queries):
feedback += f'Query {i + 1}: '
spec = spec_from_file_location('curr', submission)
module = module_from_spec(spec)
q = getattr(module, f'query{i + 1}')()
ans = evaluate(q)
def eq(i):
if i in (6, 7):
return grade78(ans, i)
return ans == answers[i]
if eq(i):
grade += 1
feedback += 'Correct.\n'
feedback += 'Wrong Answer.\n'
print('ans: ', ans, '\nsol: ', answers[i])
except Exception as e:
feedback += 'Runtime Error.\n'
print (e)
feedback += 'No python file in submission.\n'
# output
results = {
'output': feedback,
'score': grade * 100 / n_queries,
'max_score': 100,
with open('/autograder/results/results.json', 'w') as res:
json.dump(results, res)