restructure

dev
Bill 2 years ago
parent 42c334af84
commit 780a60fa5d

@ -0,0 +1,24 @@
FROM ubuntu:latest
RUN cp /bin/bash /bin/sh
RUN apt update && apt install -y wget
RUN export OS_VER=`cat /etc/os-release | grep VERSION_CODENAME` &&\
export OS_VER=${OS_VER#*=} &&\
printf "deb https://dev.monetdb.org/downloads/deb/ "${OS_VER}" monetdb \ndeb-src https://dev.monetdb.org/downloads/deb/ "${OS_VER}" monetdb\n">/etc/apt/sources.list.d/monetdb.list
RUN wget --output-document=/etc/apt/trusted.gpg.d/monetdb.gpg https://dev.monetdb.org/downloads/MonetDB-GPG-KEY.gpg
RUN apt update && apt install -y python3 python3-pip clang-14 libmonetdbe-dev git
RUN git clone https://github.com/sunyinqi0508/AQuery2
RUN python3 -m pip install -r AQuery2/requirements.txt
ENV IS_DOCKER_IMAGE=1 CXX=clang-14
CMD cd AQuery2 && python3 prompt.py

@ -1,18 +1,29 @@
OS_SUPPORT =
MonetDB_LIB =
Threading =
CXXFLAGS = --std=c++1z
OPTFLAGS = -O3 -flto -march=native
SHAREDFLAGS = -shared -fPIC
ifeq ($(PCH), 1)
PCHFLAGS = -include server/aggregations.h
else
PCHFLAGS =
endif
ifeq ($(OS),Windows_NT)
NULL_DEVICE = NUL
OS_SUPPORT += server/winhelper.cpp
MonetDB_LIB += -Imonetdb/msvc msc-plugin/monetdbe.dll
else
NULL_DEVICE = /dev/null
MonetDB_LIB += -I/usr/local/include/monetdb -I/usr/include/monetdb -lmonetdbe
endif
ifeq ($(THREADING),1)
Threading += server/threading.cpp -DTHREADING
endif
pch:
$(CXX) -x c++-header server/aggregations.h -o server/aggregations.h.pch
info:
$(info $(OS_SUPPORT))
$(info $(OS))
@ -20,11 +31,16 @@ info:
$(info "test")
$(info $(CXX))
server.bin:
$(CXX) server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) $(Threading) -flto --std=c++1z -O3 -march=native -o server.bin
$(CXX) server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) $(Threading) $(OPTFLAGS) $(CXXFLAGS) -o server.bin
server.so:
# $(CXX) server/server.cpp server/monetdb_conn.cpp -fPIC -shared $(OS_SUPPORT) monetdb/msvc/monetdbe.dll --std=c++1z -O3 -march=native -o server.so -I./monetdb/msvc
$(CXX) -shared -fPIC -flto server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) server/monetdb_conn.cpp $(Threading) $(MonetDB_LIB) --std=c++1z -o server.so -O3
$(CXX) $(SHAREDFLAGS) server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) server/monetdb_conn.cpp $(Threading) $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o server.so
snippet:
$(CXX) -shared -fPIC -flto --std=c++1z -include server/aggregations.h out.cpp server/monetdb_conn.cpp server/table.cpp server/io.cpp $(MonetDB_LIB) -O3 -march=native -o dll.so
$(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp server/monetdb_conn.cpp server/table.cpp server/io.cpp $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o dll.so
docker:
docker build -t aquery .
clean:
rm *.shm -rf
rm *.shm *.o dll.so server.so server.bin -rf 2> $(NULL_DEVICE) || true

@ -1,4 +1,4 @@
# AQuery++ DB
# AQuery++ Database
## Introduction
AQuery++ Database is a cross-platform, In-Memory Column-Store Database that incorporates compiled query execution.
@ -12,40 +12,51 @@ AQuery++ Database is a cross-platform, In-Memory Column-Store Database that inco
- AQuery++ supports different execution engines thanks to the decoupled compiler structure.
- AQuery Execution Engine: executes query by compiling the query plan to C++ code. Doesn't support joins and udf functions.
- Hybrid Execution Engine: decouples the query into two parts. The sql-compliant part is executed by an Embedded version of Monetdb and everything else is executed by a post-process module which is generated by AQuery++ Compiler in C++ and then compiled and executed.
- K9 Execution Engine (discontinued).
- K9 Execution Engine: (discontinued).
## Roadmap
- [x] SQL Parser -> AQuery Parser (Front End)
- [ ] AQuery-C++ Compiler (Back End)
- [x] AQuery-C++ Compiler (Back End)
- [x] Schema and Data Model
- [x] Data acquisition/output from/to csv file
- [x] Single table queries
- [x] Projections and Single Table Aggregations
- [x] Group by Aggregations
- [x] Filters
- [x] Order by
- [x] Assumption
- [x] Flatten
- [x] Multi-table
- [x] Join
- [ ] Execution Engine
- [x] Projections and single-group Aggregations
- [x] Group by Aggregations
- [x] Filters
- [x] Order by
- [x] Assumption
- [x] Flatten
- [x] UDFs (Hybrid Engine only)
- [ ] User Module
- [ ] Triggers
- [x] Join (Hybrid Engine only)
- [ ] Subqueries
- [ ] -> Optimizing Compiler
- [ ] Query Optimization
- [x] Selection/Order by push-down
- [ ] Join Optimization
## TODO:
- [ ] User Module load syntax parsing (fn definition/registration)
- [ ] User Module test
- [ ] Interval based triggers
- [ ] Bug fixes: type deduction misaligned in Hybrid Engine
- [ ] Limitation: putting ColRefs back to monetdb.
- [ ] C++ Meta-Programming: Eliminate template recursions as much as possible.
- [ ] IPC: Better ways to communicate between Interpreter (Python) and Executer (C++).
- [ ] Sockets? stdin/stdout capture?
# Installation
## Requirements
Recent version of Linux, Windows or MacOS, with recent C++ compiler that has C++17 (1z) support.
- GCC: 9.0 or above (g++ 7.x, 8.x fail to handle variadic template expansion due to compiler bug)
- Clang: 6.0 or above (Recommended)
- MSVC: 2019 or later
1. Recent version of Linux, Windows or MacOS, with recent C++ compiler that has C++17 (1z) support.
- GCC: 9.0 or above (g++ 7.x, 8.x fail to handle variadic template expansion due to a compiler bug)
- Clang: 5.0 or above (Recommended)
- MSVC: 2017 or later
2. Monetdb for Hybrid Engine
- On windows, the required libraries and headers are already included in the repo.
- On Linux, see [Monetdb Easy Setup](https://www.monetdb.org/easy-setup/) for instructions.
- On MacOS, Monetdb can be easily installed in homebrew `brew install monetdb`.
3. Python 3.6 or above and install required packages in requirements.txt by `python3 -m pip install -r requirements.txt`
## Usage
`python3 prompt.py` will launch the interactive command prompt. The server binary will be autometically rebuilt and started.
#### Commands:

@ -1,7 +1,7 @@
# put environment specific configuration here
## GLOBAL CONFIGURATION FLAGS
version_string = '0.3.3a'
version_string = '0.4.0a'
add_path_to_ldpath = True
rebuild_backend = True
run_backend = True

@ -650,7 +650,7 @@ def parser(literal_string, ident, sqlserver=False):
) / to_json_call
load_data = (
keyword("data").suppress()
keyword("data") ("file_type")
+ keyword("infile")("loc")
+ literal_string ("file")
+ INTO
@ -680,7 +680,7 @@ def parser(literal_string, ident, sqlserver=False):
)
load_module = (
keyword("module").suppress()
keyword("module") ("file_type")
+ FROM
+ literal_string ("file")
+ Optional(
@ -696,12 +696,11 @@ def parser(literal_string, ident, sqlserver=False):
)
load = (
keyword("load")("op")
keyword("load")
+ (load_data | load_module)
) ("load")
sql_stmts = delimited_list( (
query
| (insert | update | delete | load)

@ -62,6 +62,13 @@ class Types:
return self.sqlname
def __str__(self) -> str:
return self.sqlname
@staticmethod
def decode(aquery_type : str, vector_type:str = 'ColRef') -> "Types":
if (aquery_type.startswith('vec')):
return VectorT(Types.decode(aquery_type[3:]), vector_type)
return type_table[aquery_type]
class TypeCollection:
def __init__(self, sz, deftype, fptype = None, utype = None, *, collection = None) -> None:
self.size = sz
@ -95,6 +102,28 @@ UIntT = Types(7, name = 'uint32', sqlname = 'UINT32', long_type=ULongT, fp_type=
UShortT = Types(6, name = 'uint16', sqlname = 'UINT16', long_type=ULongT, fp_type=FloatT)
UByteT = Types(5, name = 'uint8', sqlname = 'UINT8', long_type=ULongT, fp_type=FloatT)
StrT = Types(200, name = 'str', cname = 'const char*', sqlname='VARCHAR', ctype_name = 'types::STRING')
VoidT = Types(200, name = 'void', cname = 'void', sqlname='Null', ctype_name = 'types::None')
class VectorT(Types):
def __init__(self, inner_type : Types, vector_type:str = 'ColRef'):
self.inner_type = inner_type
self.vector_type = vector_type
@property
def name(self) -> str:
return f'{self.vector_type}<{self.inner_type.name}>'
@property
def sqlname(self) -> str:
return 'BINARY'
@property
def cname(self) -> str:
return self.name
@property
def fp_type(self) -> Types:
return VectorT(self.inner_type.fp_type, self.vector_type)
@property
def long_type(self):
return VectorT(self.inner_type.long_type, self.vector_type)
def _ty_make_dict(fn : str, *ty : Types):
@ -265,6 +294,8 @@ builtin_unary_arith = _op_make_dict(opneg)
builtin_unary_special = _op_make_dict(spnull)
builtin_cstdlib = _op_make_dict(fnsqrt, fnlog, fnsin, fncos, fntan, fnpow)
builtin_func = _op_make_dict(fnmax, fnmin, fnsum, fnavg, fnmaxs, fnmins, fnsums, fnavgs, fncnt)
user_module_func = {}
builtin_operators : Dict[str, OperatorBase] = {**builtin_binary_arith, **builtin_binary_logical,
**builtin_unary_arith, **builtin_unary_logical, **builtin_unary_special, **builtin_func, **builtin_cstdlib}
**builtin_unary_arith, **builtin_unary_logical, **builtin_unary_special, **builtin_func, **builtin_cstdlib,
**user_module_func}

@ -85,7 +85,6 @@ if __name__ == '__main__':
import os
from dataclasses import dataclass
import enum
from tabnanny import check
import time
# import dbconn
import re
@ -317,7 +316,7 @@ def save(q:str, cxt: xengine.Context):
savefile('udf', 'udf', '.hpp')
savefile('sql', 'sql')
def main(running = lambda:True, next = input, state = None):
def prompt(running = lambda:True, next = input, state = None):
if state is None:
state = init_prompt()
q = ''
@ -442,9 +441,12 @@ def main(running = lambda:True, next = input, state = None):
if trimed[0].startswith('f'):
fn = 'stock.a' if len(trimed) <= 1 or len(trimed[1]) == 0 \
else trimed[1]
with open(fn, 'r') as file:
contents = file.read()#.lower()
try:
with open(fn, 'r') as file:
contents = file.read()#.lower()
except FileNotFoundError:
with open('tests/' + fn, 'r') as file:
contents = file.read()
state.stmts = parser.parse(contents)
continue
state.stmts = parser.parse(q)
@ -488,7 +490,7 @@ if __name__ == '__main__':
while(not ws.sub('', nextcmd) or nextcmd.strip().startswith('#')):
nextcmd = file.readline()
cnt = _Counter(1)
main(lambda : cnt.inc(-1) > 0, lambda:nextcmd.strip(), state)
prompt(lambda : cnt.inc(-1) > 0, lambda:nextcmd.strip(), state)
nextcmd = file.readline()
if check_param(['-p', '--parse']):
@ -505,5 +507,5 @@ if __name__ == '__main__':
elif any([s in nextcmd for s in thread_string]):
server_mode = RunType.Threaded
main(state=state)
prompt(state=state)

@ -601,18 +601,62 @@ class insert(ast_node):
pass
self.sql += ', '.join(list_values) + ')'
class load(ast_node):
name="load"
first_order = name
def init(self, _):
if self.context.dialect == 'MonetDB':
def init(self, node):
self.module = False
if node['load']['file_type'] == 'module':
self.produce = self.produce_module
self.module = True
elif self.context.dialect == 'MonetDB':
self.produce = self.produce_monetdb
else:
else:
self.produce = self.produce_aq
if self.parent is None:
self.context.sql_begin()
def produce_module(self, node):
# create command for exec engine -> done
# create c++ stub
# create dummy udf obj for parsing
# def decode_type(ty : str) -> str:
# ret = ''
# back = ''
# while(ty.startswith('vec')):
# ret += 'ColRef<'
# back += '>'
# ty = ty[3:]
# ret += ty
# return ret + back
node = node['load']
file = node['file']['literal']
self.context.queries.append(f'M{file}')
self.module_name = file
self.functions = {}
if 'funcs' in node:
for f in enlist(node['funcs']):
fname = f['fname']
self.context.queries.append(f'F{fname}')
ret_type = VoidT
if 'ret_type' in f:
ret_type = Types.decode(f['ret_type'])
nargs = 0
arglist = ''
if 'var' in f:
arglist = []
for v in enlist(f['var']):
arglist.append(f'{Types.decode(v["type"]).cname} {v["arg"]}')
nargs = len(arglist)
arglist = ', '.join(arglist)
# create c++ stub
cpp_stub = f'{ret_type.cname} (*{fname})({arglist});'
self.context.module_stubs += cpp_stub + '\n'
self.context.module_map[fname] = cpp_stub
#registration for parser
self.functions[fname] = user_module_function(fname, nargs, ret_type)
def produce_aq(self, node):
node = node['load']
s1 = 'LOAD DATA INFILE '
@ -710,7 +754,11 @@ class udf(ast_node):
self.var_table = {}
self.args = []
if self.context.udf is None:
self.context.udf = Context.udf_head
self.context.udf = (
Context.udf_head
+ self.context.module_stubs
+ self.context.get_init_func()
)
self.context.headers.add('\"./udf.hpp\"')
self.vecs = set()
self.code_list = []
@ -933,6 +981,11 @@ class udf(ast_node):
else:
return udf.ReturnPattern.bulk_return
class user_module_function(OperatorBase):
def __init__(self, name, nargs, ret_type):
super().__init__(name, nargs, lambda: ret_type, call=fn_behavior)
user_module_func[name] = self
builtin_operators[name] = self
def include(objs):
import inspect

@ -102,6 +102,8 @@ class Context:
self.tables = []
self.cols = []
self.datasource = None
self.module_stubs = ''
self.module_map = {}
self.udf_map = dict()
self.udf_agg_map = dict()
self.use_columnstore = False
@ -134,21 +136,28 @@ class Context:
'#include \"./server/libaquery.h\"\n'
'#include \"./server/aggregations.h\"\n\n'
)
def get_init_func(self):
if self.module_map:
return ''
ret = 'void init(Context* cxt){\n'
for fname in self.module_map.keys():
ret += f'{fname} = (decltype({fname}))(cxt->get_module_function("{fname}"));\n'
return ret + '}\n'
def sql_begin(self):
self.sql = ''
def sql_end(self):
self.queries.append('Q' + self.sql)
self.sql = ''
self.sql = ''
def postproc_begin(self, proc_name: str):
self.ccode = self.function_deco + proc_name + self.function_head
def postproc_end(self, proc_name: str):
self.procs.append(self.ccode + 'return 0;\n}')
self.ccode = ''
self.queries.append('P' + proc_name)
self.queries.append('P' + proc_name)
def finalize(self):
if not self.finalized:
headers = ''
@ -159,4 +168,4 @@ class Context:
headers += '#include ' + h + '\n'
self.ccode = headers + '\n'.join(self.procs)
self.headers = set()
return self.ccode
return self.ccode

@ -1,5 +1,5 @@
mo-future
mo-dots==8.20.21357
mo-dots
mo-parsing
mo-imports
dataclasses; python_version < '3.7'

@ -34,7 +34,7 @@ struct Context{
typedef int (*printf_type) (const char *format, ...);
std::unordered_map<const char*, void*> tables;
std::unordered_map<const char*, uColRef *> cols;
void* module_function_maps = 0;
Config* cfg;
int n_buffers, *sz_bufs;
@ -62,6 +62,7 @@ struct Context{
}
void init_session();
void end_session();
void* get_module_function(const char*);
};
#ifdef _WIN32

@ -77,10 +77,21 @@ void Context::init_session(){
Session::Statistic stats;
}
}
void* Context::get_module_function(const char* fname){
auto fmap = static_cast<std::unordered_map<std::string, void*>*>
(this->module_function_maps);
auto ret = fmap->find(fname);
return ret == fmap->end() ? nullptr : ret->second;
}
int dll_main(int argc, char** argv, Context* cxt){
Config *cfg = reinterpret_cast<Config *>(argv[0]);
std::unordered_map<std::string, void*> user_module_map;
if (cxt->module_function_maps == 0)
cxt->module_function_maps = new std::unordered_map<std::string, void*>();
auto module_fn_map =
static_cast<std::unordered_map<std::string, void*>*>(cxt->module_function_maps);
auto buf_szs = cfg->buffer_sizes;
void** buffers = (void**)malloc(sizeof(void*) * cfg->n_buffers);
for (int i = 0; i < cfg->n_buffers; i++)
@ -95,6 +106,7 @@ int dll_main(int argc, char** argv, Context* cxt){
while(cfg->running){
if (cfg->new_query) {
void *handle = 0;
void *user_module_handle = 0;
if (cfg->backend_type == BACKEND_MonetDB){
if (cxt->alt_server == 0)
cxt->alt_server = new Server(cxt);
@ -106,13 +118,43 @@ int dll_main(int argc, char** argv, Context* cxt){
for(int i = 0; i < n_recv; ++i)
{
//printf("%s, %d\n", n_recvd[i], n_recvd[i][0] == 'Q');
if (n_recvd[i][0] == 'Q'){
server->exec(n_recvd[i] + 1);
printf("Exec Q%d: %s", i, n_recvd[i]);
}
else if (n_recvd[i][0] == 'P' && handle && !server->haserror()) {
code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, n_recvd[i]+1));
c(cxt);
switch(n_recvd[i][0]){
case 'Q':
{
server->exec(n_recvd[i] + 1);
printf("Exec Q%d: %s", i, n_recvd[i]);
}
break;
case 'P':
if(handle && !server->haserror()) {
code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, n_recvd[i]+1));
c(cxt);
}
break;
case 'M':
{
auto mname = n_recvd[i] + 1;
user_module_handle = dlopen(mname, RTLD_LAZY);
user_module_map[mname] = user_module_handle;
}
break;
case 'F':
{
auto fname = n_recvd[i] + 1;
module_fn_map->insert_or_assign(fname, dlsym(user_module_handle, fname));
}
break;
case 'U':
{
auto mname = n_recvd[i] + 1;
auto it = user_module_map.find(mname);
if (user_module_handle == it->second)
user_module_handle = 0;
dlclose(it->second);
user_module_map.erase(it);
}
break;
}
}
if(handle) {

@ -151,3 +151,18 @@ bool ThreadPool::busy(){
}
return true;
}
void IntervalBasedTrigger::timer::reset(){
time_remaining = interval;
}
bool IntervalBasedTrigger::timer::tick(uint32_t t){
if (time_remaining > t) {
time_remaining -= t;
return false;
}
else{
time_remaining = interval - t%interval;
return true;
}
}

@ -53,6 +53,8 @@ public:
struct timer{
uint32_t interval; // in milliseconds
uint32_t time_remaining;
void reset();
bool tick(uint32_t t);
};
void add_trigger();
private:

@ -2,13 +2,23 @@
f stock.a
xexec
f moving_avg.a
xexec
f q1.sql
xexec
f joins.a
xexec
f udf3.a
xexec
f strings.a
xexec
f funcs.a
xexec
exit

@ -9,23 +9,13 @@ FUNCTION sd ( x) {
sqrt ( covariance (x , x) )
}
AGGREGATION FUNCTION covariances(x, y, w){
static xmeans := 0, ymeans := 0, cnt := 0;
if (cnt < w) { xmeans += x; }
else {
xmeans += (x - x.vec[cnt - w]) / w;
ymeans += (y - y.vec[cnt - w]) / w;
}
avg (( x.vec(x.len-w, x.len) - xmean ) * (y.vec(y.len - w, y.len) - ymean ))
}
FUNCTION pairCorr (x , y ) {
covariance (x , y ) / ( sd (x) * sd (y ))
}
CREATE TABLE test1(a INT, b INT, c INT, d INT)
LOAD DATA INFILE "test.csv"
LOAD DATA INFILE "data/test.csv"
INTO TABLE test1
FIELDS TERMINATED BY ","

@ -20,13 +20,13 @@ covariance (x , y ) / ( sd (x) * sd (y ))
CREATE TABLE tt(a INT, b INT, c INT, d INT)
LOAD DATA INFILE "test.csv"
LOAD DATA INFILE "data/test.csv"
INTO TABLE tt
FIELDS TERMINATED BY ","
CREATE TABLE sale(Mont INT, sales INT)
LOAD DATA INFILE "moving_avg.csv"
LOAD DATA INFILE "data/moving_avg.csv"
INTO TABLE sale
FIELDS TERMINATED BY ","

@ -1,6 +1,6 @@
CREATE TABLE sale(Mont INT, sales INT)
LOAD DATA INFILE "moving_avg.csv"
LOAD DATA INFILE "data/moving_avg.csv"
INTO TABLE sale
FIELDS TERMINATED BY ","

@ -1,6 +1,6 @@
CREATE TABLE testq1(a INT, b INT, c INT, d INT)
LOAD DATA INFILE "test.csv"
LOAD DATA INFILE "data/test.csv"
INTO TABLE testq1
FIELDS TERMINATED BY ","

@ -1,6 +1,6 @@
CREATE TABLE types_test(names varchar(10), val real, id int)
LOAD DATA INFILE "datatypes.csv"
LOAD DATA INFILE "data/datatypes.csv"
INTO TABLE types_test
FIELDS TERMINATED BY ","

@ -31,7 +31,7 @@ AGGREGATION FUNCTION covariances2(x, y, w){
CREATE TABLE test(a INT, b INT, c INT, d INT)
LOAD DATA INFILE "test2.csv"
LOAD DATA INFILE "data/test2.csv"
INTO TABLE test
FIELDS TERMINATED BY ","

Loading…
Cancel
Save