Multi-query support

dev
Bill 2 years ago
parent 70f29e028d
commit 5699638520

@ -10,5 +10,5 @@ FROM sale
INTO OUTFILE "moving_avg_output.csv" INTO OUTFILE "moving_avg_output.csv"
FIELDS TERMINATED BY ";" FIELDS TERMINATED BY ";"
-- select Mont, mins(2,sales) from sale assuming desc Mont group by sales select Mont, mins(2,sales) from sale assuming desc Mont group by Mont
-- into outfile "flatten.csv" into outfile "flatten.csv"

@ -1,19 +1,53 @@
#include "./server/libaquery.h" #include "./server/libaquery.h"
#include <unordered_map>
#include "./server/monetdb_conn.h" #include "./server/monetdb_conn.h"
#include "./server/hasher.h"
extern "C" int __DLLEXPORT__ dllmain(Context* cxt) { #include "./server/aggregations.h"
__AQEXPORT__(int) dll_3V1c8v(Context* cxt) {
using namespace std; using namespace std;
using namespace types; using namespace types;
auto server = static_cast<Server*>(cxt->alt_server); auto server = static_cast<Server*>(cxt->alt_server);
auto len_5sGusn = server->cnt; auto len_5Bixfq = server->cnt;
auto sumc_5IN = ColRef<__int128_t>(len_5sGusn, server->getCol(0)); auto mont_1Mz = ColRef<int>(len_5Bixfq, server->getCol(0));
auto b_79y = ColRef<int>(len_5sGusn, server->getCol(1)); auto sales_2DN = ColRef<int>(len_5Bixfq, server->getCol(1));
auto d_4yS = ColRef<int>(len_5sGusn, server->getCol(2)); auto out_5Ffhnc = new TableInfo<int,double>("out_5Ffhnc");
auto out_kio0QJ = new TableInfo<__int128_t,int,int>("out_kio0QJ"); out_5Ffhnc->get_col<0>().initfrom(mont_1Mz);
out_kio0QJ->get_col<0>().initfrom(sumc_5IN); out_5Ffhnc->get_col<1>() = avgw(3, sales_2DN);
out_kio0QJ->get_col<1>().initfrom(b_79y); print(*out_5Ffhnc);
out_kio0QJ->get_col<2>().initfrom(d_4yS); FILE* fp_57Mh6f = fopen("moving_avg_output.csv", "w");
print(*out_kio0QJ); out_5Ffhnc->printall(";", "\n", nullptr, fp_57Mh6f);
fclose(fp_57Mh6f);
puts("done.");
return 0;
}
__AQEXPORT__(int) dll_frRsQ7(Context* cxt) {
using namespace std;
using namespace types;
auto server = static_cast<Server*>(cxt->alt_server);
auto len_5JydG3 = server->cnt;
auto mont_34m = ColRef<int>(len_5JydG3, server->getCol(0));
auto sales_4wo = ColRef<int>(len_5JydG3, server->getCol(1));
auto out_4Fsh6O = new TableInfo<int,ColRef<int>>("out_4Fsh6O");
decltype(auto) col_4A04Hh = out_4Fsh6O->get_col<0>();
decltype(auto) col_1S5CS2 = out_4Fsh6O->get_col<1>();
auto t4HUjxwQ = mont_34m;
typedef record<decays<decltype(t4HUjxwQ)::value_t>> record_type5PGugsV;
unordered_map<record_type5PGugsV, vector_type<uint32_t>, transTypes<record_type5PGugsV, hasher>> g6HIDqpq;
for (uint32_t ilq = 0; ilq < t4HUjxwQ.size; ++ilq){
g6HIDqpq[forward_as_tuple(t4HUjxwQ[ilq])].emplace_back(ilq);
}
for (auto& i2M : g6HIDqpq) {
auto &key_5JcLJMV = i2M.first;
auto &val_yqDe0lt = i2M.second;
col_4A04Hh.emplace_back(get<0>(key_5JcLJMV));
col_1S5CS2.emplace_back(minw(2, sales_4wo[val_yqDe0lt]));
}
print(*out_4Fsh6O);
FILE* fp_6UC6Yg = fopen("flatten.csv", "w");
out_4Fsh6O->printall(",", "\n", nullptr, fp_6UC6Yg);
fclose(fp_6UC6Yg);
puts("done."); puts("done.");
return 0; return 0;
} }

@ -227,8 +227,8 @@ while test_parser:
cxt = xengine.exec(stmts, cxt, keep) cxt = xengine.exec(stmts, cxt, keep)
if server_mode == RunType.Threaded: if server_mode == RunType.Threaded:
# assignment to avoid auto gc # assignment to avoid auto gc
sqls = [s.strip() for s in cxt.sql.split(';')] # sqls = [s.strip() for s in cxt.sql.split(';')]
qs = [ctypes.c_char_p(bytes(q, 'utf-8')) for q in sqls if len(q)] qs = [ctypes.c_char_p(bytes(q, 'utf-8')) for q in cxt.queries if len(q)]
sz = len(qs) sz = len(qs)
payload = (ctypes.c_char_p*sz)(*qs) payload = (ctypes.c_char_p*sz)(*qs)
try: try:
@ -282,17 +282,18 @@ while test_parser:
tm = time.gmtime() tm = time.gmtime()
fname = f'{tm.tm_year}{tm.tm_mon}_{tm.tm_mday}_{tm.tm_hour}:{tm.tm_min}:{tm.tm_sec}' fname = f'{tm.tm_year}{tm.tm_mon}_{tm.tm_mday}_{tm.tm_hour}:{tm.tm_min}:{tm.tm_sec}'
if cxt: if cxt:
def savefile(attr:str, desc:str): from typing import Optional
def savefile(attr:str, desc:str, ext:Optional[str] = None):
if hasattr(cxt, attr): if hasattr(cxt, attr):
attr : str = getattr(cxt, attr) attr : str = getattr(cxt, attr)
if attr: if attr:
ext = '.' + desc ext = ext if ext else '.' + desc
name = fname if fname.endswith(ext) else fname + ext name = fname if fname.endswith(ext) else fname + ext
with open('saves/' + name, 'wb') as cfile: with open('saves/' + name, 'wb') as cfile:
cfile.write(attr.encode('utf-8')) cfile.write(attr.encode('utf-8'))
print(f'saved {desc} code as {name}') print(f'saved {desc} code as {name}')
savefile('ccode', 'cpp') savefile('ccode', 'cpp')
savefile('udf', 'udf') savefile('udf', 'udf', '.hpp')
savefile('sql', 'sql') savefile('sql', 'sql')
continue continue

@ -25,7 +25,7 @@ def exec(stmts, cxt = None, keep = False):
generate(s, cxt) generate(s, cxt)
else: else:
generate(stmts_stmts, cxt) generate(stmts_stmts, cxt)
cxt.print(cxt.sql) cxt.print(cxt.queries)
return cxt return cxt
__all__ = ["initialize", "generate", "exec", "saved_cxt"] __all__ = ["initialize", "generate", "exec", "saved_cxt"]

@ -35,7 +35,10 @@ class ast_node:
name = 'null' name = 'null'
def init(self, _): def init(self, _):
if self.parent is None:
self.context.sql_begin()
self.add(self.__class__.name.upper()) self.add(self.__class__.name.upper())
def produce(self, _): def produce(self, _):
pass pass
def spawn(self, _): def spawn(self, _):
@ -44,7 +47,7 @@ class ast_node:
def consume(self, _): def consume(self, _):
if self.parent is None: if self.parent is None:
self.emit(self.sql+';\n') self.emit(self.sql+';\n')
self.context.sql_end()
from reconstruct.expr import expr, fastscan from reconstruct.expr import expr, fastscan
@ -54,11 +57,17 @@ class projection(ast_node):
first_order = 'select' first_order = 'select'
def init(self, _): def init(self, _):
# skip default init
pass pass
def produce(self, node): def produce(self, node):
p = node['select'] p = node['select']
self.projections = p if type(p) is list else [p] self.projections = p if type(p) is list else [p]
self.add('SELECT') self.add('SELECT')
if self.parent is None:
self.context.sql_begin()
self.postproc_fname = 'dll_' + base62uuid(6)
self.context.postproc_begin(self.postproc_fname)
def spawn(self, node): def spawn(self, node):
self.datasource = None # datasource is Join instead of TableInfo self.datasource = None # datasource is Join instead of TableInfo
@ -245,6 +254,11 @@ class projection(ast_node):
self.outfile.finalize() self.outfile.finalize()
self.context.emitc(f'puts("done.");') self.context.emitc(f'puts("done.");')
if self.parent is None:
self.context.sql_end()
self.context.postproc_end(self.postproc_fname)
class orderby(ast_node): class orderby(ast_node):
name = 'order by' name = 'order by'
def produce(self, node): def produce(self, node):
@ -314,15 +328,16 @@ class scan(ast_node):
class groupby_c(ast_node): class groupby_c(ast_node):
name = '_groupby' name = '_groupby'
def init(self, _): def init(self, node : List[Tuple[expr, Set[ColRef]]]):
self.proj : projection = self.parent self.proj : projection = self.parent
return super().init(_) self.glist : List[Tuple[expr, Set[ColRef]]] = node
return super().init(node)
def produce(self, node : List[Tuple[expr, Set[ColRef]]]): def produce(self, node : List[Tuple[expr, Set[ColRef]]]):
self.context.headers.add('"./server/hasher.h"') self.context.headers.add('"./server/hasher.h"')
self.context.headers.add('unordered_map') self.context.headers.add('unordered_map')
self.group = 'g' + base62uuid(7) self.group = 'g' + base62uuid(7)
self.group_type = 'record_type' + base62uuid(7) self.group_type = 'record_type' + base62uuid(7)
self.datasource = self.parent.datasource self.datasource = self.proj.datasource
self.scanner = None self.scanner = None
self.datasource.rec = set() self.datasource.rec = set()
@ -330,7 +345,7 @@ class groupby_c(ast_node):
g_contents_list = [] g_contents_list = []
first_col = '' first_col = ''
for g in node: for g in self.glist:
e = expr(self, g[0].node, c_code=True) e = expr(self, g[0].node, c_code=True)
g_str = e.eval(c_code = True, y = lambda c: self.proj.pyname2cname[c]) g_str = e.eval(c_code = True, y = lambda c: self.proj.pyname2cname[c])
# if v is compound expr, create tmp cols # if v is compound expr, create tmp cols
@ -345,7 +360,7 @@ class groupby_c(ast_node):
self.context.emitc(f'typedef record<{",".join(g_contents_decltype)}> {self.group_type};') self.context.emitc(f'typedef record<{",".join(g_contents_decltype)}> {self.group_type};')
self.context.emitc(f'unordered_map<{self.group_type}, vector_type<uint32_t>, ' self.context.emitc(f'unordered_map<{self.group_type}, vector_type<uint32_t>, '
f'transTypes<{self.group_type}, hasher>> {self.group};') f'transTypes<{self.group_type}, hasher>> {self.group};')
self.n_grps = len(node) self.n_grps = len(self.glist)
self.scanner = scan(self, first_col + '.size') self.scanner = scan(self, first_col + '.size')
self.scanner.add(f'{self.group}[forward_as_tuple({g_contents}[{self.scanner.it_ver}])].emplace_back({self.scanner.it_ver});') self.scanner.add(f'{self.group}[forward_as_tuple({g_contents}[{self.scanner.it_ver}])].emplace_back({self.scanner.it_ver});')
@ -373,8 +388,14 @@ class groupby_c(ast_node):
len_var = 'len_'+base62uuid(7) len_var = 'len_'+base62uuid(7)
gscanner.add(f'auto &{len_var} = {val_var}.size;', position = 'front') gscanner.add(f'auto &{len_var} = {val_var}.size;', position = 'front')
def get_key_idx (varname : str):
for i, g in enumerate(self.glist):
if varname == g[0].eval():
return i
return var_table[varname]
def get_var_names (varname : str): def get_var_names (varname : str):
var = var_table[varname] var = get_key_idx(varname)
if type(var) is str: if type(var) is str:
return f'{var}[{val_var}]' return f'{var}[{val_var}]'
else: else:
@ -545,6 +566,8 @@ class create_table(ast_node):
name = 'create_table' name = 'create_table'
first_order = name first_order = name
def init(self, node): def init(self, node):
if self.parent is None:
self.context.sql_begin()
self.sql = 'CREATE TABLE ' self.sql = 'CREATE TABLE '
def produce(self, node): def produce(self, node):
@ -562,6 +585,7 @@ class create_table(ast_node):
class insert(ast_node): class insert(ast_node):
name = 'insert' name = 'insert'
first_order = name first_order = name
def produce(self, node): def produce(self, node):
values = node['query']['select'] values = node['query']['select']
tbl = node['insert'] tbl = node['insert']
@ -586,6 +610,8 @@ class load(ast_node):
self.produce = self.produce_monetdb self.produce = self.produce_monetdb
else: else:
self.produce = self.produce_aq self.produce = self.produce_aq
if self.parent is None:
self.context.sql_begin()
def produce_aq(self, node): def produce_aq(self, node):
node = node['load'] node = node['load']

@ -87,6 +87,8 @@ class Context:
self.finalized = False self.finalized = False
self.udf = None self.udf = None
self.scans = [] self.scans = []
self.procs = []
self.queries = []
def __init__(self): def __init__(self):
self.tables_byname = dict() self.tables_byname = dict()
@ -101,10 +103,8 @@ class Context:
self.has_dll = False self.has_dll = False
self.dialect = 'MonetDB' self.dialect = 'MonetDB'
self.have_hge = False self.have_hge = False
self.Error = lambda *args: print(*args)
self.Info = lambda *_: None self.Info = lambda *_: None
self.Info = lambda *_: None
self.new()
def emit(self, sql:str): def emit(self, sql:str):
self.sql += sql + ' ' self.sql += sql + ' '
@ -118,17 +118,31 @@ class Context:
self.emitc(str_scan) self.emitc(str_scan)
self.scans.remove(scan) self.scans.remove(scan)
function_head = ''' function_deco = '__AQEXPORT__(int) '
extern "C" int __DLLEXPORT__ dllmain(Context* cxt) { function_head = ('(Context* cxt) {\n' +
using namespace std; '\tusing namespace std;\n' +
using namespace types; '\tusing namespace types;\n' +
auto server = static_cast<Server*>(cxt->alt_server); '\tauto server = static_cast<Server*>(cxt->alt_server);\n')
'''
udf_head = ('#pragma once\n' udf_head = ('#pragma once\n'
'#include \"./server/libaquery.h\"\n' '#include \"./server/libaquery.h\"\n'
'#include \"./server/aggregations.h\"\n\n' '#include \"./server/aggregations.h\"\n\n'
) )
def sql_begin(self):
self.sql = ''
def sql_end(self):
self.queries.append('Q' + self.sql)
self.sql = ''
def postproc_begin(self, proc_name: str):
self.ccode = self.function_deco + proc_name + self.function_head
def postproc_end(self, proc_name: str):
self.procs.append(self.ccode + 'return 0;\n}')
self.ccode = ''
self.queries.append('P' + proc_name)
def finalize(self): def finalize(self):
if not self.finalized: if not self.finalized:
headers = '' headers = ''
@ -137,6 +151,6 @@ class Context:
headers += '#include <' + h + '>\n' headers += '#include <' + h + '>\n'
else: else:
headers += '#include ' + h + '\n' headers += '#include ' + h + '\n'
self.ccode = headers + self.function_head + self.ccode + 'return 0;\n}' self.ccode = headers + '\n'.join(self.procs)
self.headers = set() self.headers = set()
return self.ccode return self.ccode

Binary file not shown.

@ -85,17 +85,26 @@ int dll_main(int argc, char** argv, Context* cxt){
while(cfg->running){ while(cfg->running){
if (cfg->new_query) { if (cfg->new_query) {
void *handle = 0;
if (cfg->backend_type == BACKEND_MonetDB){ if (cfg->backend_type == BACKEND_MonetDB){
if (cxt->alt_server == 0) if (cxt->alt_server == 0)
cxt->alt_server = new Server(cxt); cxt->alt_server = new Server(cxt);
Server* server = reinterpret_cast<Server*>(cxt->alt_server); Server* server = reinterpret_cast<Server*>(cxt->alt_server);
if(n_recv > 0){ if(n_recv > 0){
if (cfg->backend_type == BACKEND_AQuery || cfg->has_dll) {
handle = dlopen("./dll.so", RTLD_LAZY);
}
for(int i = 0; i < n_recv; ++i) for(int i = 0; i < n_recv; ++i)
{ {
server->exec(n_recvd[i]); if (n_recvd[i][0] == 'Q'){
server->exec(n_recvd[i] + 1);
printf("Exec Q%d: %s\n", i, n_recvd[i]); printf("Exec Q%d: %s\n", i, n_recvd[i]);
} }
else if (n_recvd[i][0] == 'P' && handle) {
code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, n_recvd[i]+1));
c(cxt);
}
}
n_recv = 0; n_recv = 0;
} }
if(server->last_error == nullptr){ if(server->last_error == nullptr){
@ -108,12 +117,12 @@ int dll_main(int argc, char** argv, Context* cxt){
} }
// puts(cfg->has_dll ? "true" : "false"); // puts(cfg->has_dll ? "true" : "false");
if (cfg->backend_type == BACKEND_AQuery || cfg->has_dll) { if (cfg->backend_type == BACKEND_AQuery) {
void* handle = dlopen("./dll.so", RTLD_LAZY); handle = dlopen("./dll.so", RTLD_LAZY);
code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, "dllmain")); code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, "dllmain"));
c(cxt); c(cxt);
dlclose(handle);
} }
if (handle) dlclose(handle);
cfg->new_query = 0; cfg->new_query = 0;
} }
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));

@ -220,12 +220,13 @@ constexpr size_t sum_type(size_t a[], size_t sz) {
ret += a[i]; ret += a[i];
return ret; return ret;
} }
template<class Types, class ...T1> constexpr size_t sum_type() { template<class Types, class ...T1>
constexpr size_t sum_type() {
size_t t[] = {std::is_same_v<Types, T1> ...}; size_t t[] = {std::is_same_v<Types, T1> ...};
return sum_type(t, sizeof...(T1)); return sum_type(t, sizeof...(T1));
} }
template<class ...T1, class ...Types> constexpr template<class ...T1, class ...Types>
size_t count_type(std::tuple<Types...>* ts) { constexpr size_t count_type(std::tuple<Types...>* ts) {
size_t t[] = {sum_type<Types, T1...>() ...}; size_t t[] = {sum_type<Types, T1...>() ...};
return sum_type(t, sizeof...(Types)); return sum_type(t, sizeof...(Types));
} }

Binary file not shown.

Binary file not shown.

@ -2,17 +2,55 @@
#include "./server/libaquery.h" #include "./server/libaquery.h"
#include "./server/aggregations.h" #include "./server/aggregations.h"
auto covariance = [](auto x, auto y) { auto covariances2 = [](auto x, auto y, auto w, uint32_t _builtin_len, auto& _builtin_ret) {
auto xmean = avg(x); auto xmeans = 0.0;
auto ymean = avg(y); auto ymeans = 0.0;
return avg(((x - xmean) * (y - ymean))); auto l = _builtin_len;
if((l > 0)) {
xmeans = x[0];
ymeans = y[0];
_builtin_ret[0] = 0.0;
}
if((w > l))
w = l;
for(auto i = 1, j = 0; (i < w); i = (i + 1)) {
xmeans += x[i];
ymeans += y[i];
_builtin_ret[i] = avg(((x.subvec(0, i) - (xmeans / i)) * (y.subvec(0, i) - (ymeans / i))));
}
xmeans /= w;
ymeans /= w;
for(auto i = w; (i < l); i += 1) {
xmeans += ((x[i] - x[(i - w)]) / w);
ymeans += ((y[i] - y[(i - w)]) / w);
_builtin_ret[i] = avg(((x.subvec((i - w), i) - xmeans) * (y.subvec((i - w), i) - ymeans)));
}
return ;
}; };
auto sd = [](auto x) { auto covariances2_gettype = [](auto x, auto y, auto w) {
return sqrt(covariance(x, x)); uint32_t _builtin_len = 0;
}; auto xmeans = 0.0;
auto ymeans = 0.0;
auto paircorr = [](auto x, auto y) { auto l = _builtin_len;
return (covariance(x, y) / (sd(x) * sd(y))); if((l > 0)) {
xmeans = x[0];
ymeans = y[0];
return 0.0;
}
if((w > l))
w = l;
for(auto i = 1, j = 0; (i < w); i = (i + 1)) {
xmeans += x[i];
ymeans += y[i];
return avg(((x.subvec(0, i) - (xmeans / i)) * (y.subvec(0, i) - (ymeans / i))));
}
xmeans /= w;
ymeans /= w;
for(auto i = w; (i < l); i += 1) {
xmeans += ((x[i] - x[(i - w)]) / w);
ymeans += ((y[i] - y[(i - w)]) / w);
return avg(((x.subvec((i - w), i) - xmeans) * (y.subvec((i - w), i) - ymeans)));
}
}; };

Loading…
Cancel
Save