diff --git a/moving_avg.a b/moving_avg.a index 1659cb7..0c87841 100644 --- a/moving_avg.a +++ b/moving_avg.a @@ -10,5 +10,5 @@ FROM sale INTO OUTFILE "moving_avg_output.csv" FIELDS TERMINATED BY ";" --- select Mont, mins(2,sales) from sale assuming desc Mont group by sales --- into outfile "flatten.csv" \ No newline at end of file +select Mont, mins(2,sales) from sale assuming desc Mont group by Mont +into outfile "flatten.csv" \ No newline at end of file diff --git a/out.cpp b/out.cpp index 28eb44b..1320768 100644 --- a/out.cpp +++ b/out.cpp @@ -1,19 +1,53 @@ #include "./server/libaquery.h" +#include #include "./server/monetdb_conn.h" +#include "./server/hasher.h" +#include "./server/aggregations.h" +__AQEXPORT__(int) dll_3V1c8v(Context* cxt) { + using namespace std; + using namespace types; + auto server = static_cast(cxt->alt_server); +auto len_5Bixfq = server->cnt; +auto mont_1Mz = ColRef(len_5Bixfq, server->getCol(0)); +auto sales_2DN = ColRef(len_5Bixfq, server->getCol(1)); +auto out_5Ffhnc = new TableInfo("out_5Ffhnc"); +out_5Ffhnc->get_col<0>().initfrom(mont_1Mz); +out_5Ffhnc->get_col<1>() = avgw(3, sales_2DN); +print(*out_5Ffhnc); +FILE* fp_57Mh6f = fopen("moving_avg_output.csv", "w"); +out_5Ffhnc->printall(";", "\n", nullptr, fp_57Mh6f); +fclose(fp_57Mh6f); +puts("done."); +return 0; +} +__AQEXPORT__(int) dll_frRsQ7(Context* cxt) { + using namespace std; + using namespace types; + auto server = static_cast(cxt->alt_server); +auto len_5JydG3 = server->cnt; +auto mont_34m = ColRef(len_5JydG3, server->getCol(0)); +auto sales_4wo = ColRef(len_5JydG3, server->getCol(1)); +auto out_4Fsh6O = new TableInfo>("out_4Fsh6O"); +decltype(auto) col_4A04Hh = out_4Fsh6O->get_col<0>(); +decltype(auto) col_1S5CS2 = out_4Fsh6O->get_col<1>(); +auto t4HUjxwQ = mont_34m; +typedef record> record_type5PGugsV; +unordered_map, transTypes> g6HIDqpq; +for (uint32_t ilq = 0; ilq < t4HUjxwQ.size; ++ilq){ +g6HIDqpq[forward_as_tuple(t4HUjxwQ[ilq])].emplace_back(ilq); +} +for (auto& i2M : g6HIDqpq) { +auto &key_5JcLJMV = i2M.first; +auto &val_yqDe0lt = i2M.second; +col_4A04Hh.emplace_back(get<0>(key_5JcLJMV)); + +col_1S5CS2.emplace_back(minw(2, sales_4wo[val_yqDe0lt])); - extern "C" int __DLLEXPORT__ dllmain(Context* cxt) { - using namespace std; - using namespace types; - auto server = static_cast(cxt->alt_server); - auto len_5sGusn = server->cnt; -auto sumc_5IN = ColRef<__int128_t>(len_5sGusn, server->getCol(0)); -auto b_79y = ColRef(len_5sGusn, server->getCol(1)); -auto d_4yS = ColRef(len_5sGusn, server->getCol(2)); -auto out_kio0QJ = new TableInfo<__int128_t,int,int>("out_kio0QJ"); -out_kio0QJ->get_col<0>().initfrom(sumc_5IN); -out_kio0QJ->get_col<1>().initfrom(b_79y); -out_kio0QJ->get_col<2>().initfrom(d_4yS); -print(*out_kio0QJ); +} +print(*out_4Fsh6O); +FILE* fp_6UC6Yg = fopen("flatten.csv", "w"); +out_4Fsh6O->printall(",", "\n", nullptr, fp_6UC6Yg); +fclose(fp_6UC6Yg); puts("done."); return 0; } \ No newline at end of file diff --git a/prompt.py b/prompt.py index e17c71f..8d7a03b 100644 --- a/prompt.py +++ b/prompt.py @@ -227,8 +227,8 @@ while test_parser: cxt = xengine.exec(stmts, cxt, keep) if server_mode == RunType.Threaded: # assignment to avoid auto gc - sqls = [s.strip() for s in cxt.sql.split(';')] - qs = [ctypes.c_char_p(bytes(q, 'utf-8')) for q in sqls if len(q)] + # sqls = [s.strip() for s in cxt.sql.split(';')] + qs = [ctypes.c_char_p(bytes(q, 'utf-8')) for q in cxt.queries if len(q)] sz = len(qs) payload = (ctypes.c_char_p*sz)(*qs) try: @@ -282,17 +282,18 @@ while test_parser: tm = time.gmtime() fname = f'{tm.tm_year}{tm.tm_mon}_{tm.tm_mday}_{tm.tm_hour}:{tm.tm_min}:{tm.tm_sec}' if cxt: - def savefile(attr:str, desc:str): + from typing import Optional + def savefile(attr:str, desc:str, ext:Optional[str] = None): if hasattr(cxt, attr): attr : str = getattr(cxt, attr) if attr: - ext = '.' + desc + ext = ext if ext else '.' + desc name = fname if fname.endswith(ext) else fname + ext with open('saves/' + name, 'wb') as cfile: cfile.write(attr.encode('utf-8')) print(f'saved {desc} code as {name}') savefile('ccode', 'cpp') - savefile('udf', 'udf') + savefile('udf', 'udf', '.hpp') savefile('sql', 'sql') continue diff --git a/reconstruct/__init__.py b/reconstruct/__init__.py index c3b1c2c..475e503 100644 --- a/reconstruct/__init__.py +++ b/reconstruct/__init__.py @@ -25,7 +25,7 @@ def exec(stmts, cxt = None, keep = False): generate(s, cxt) else: generate(stmts_stmts, cxt) - cxt.print(cxt.sql) + cxt.print(cxt.queries) return cxt __all__ = ["initialize", "generate", "exec", "saved_cxt"] diff --git a/reconstruct/ast.py b/reconstruct/ast.py index 5c04d14..5b248e6 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -35,7 +35,10 @@ class ast_node: name = 'null' def init(self, _): + if self.parent is None: + self.context.sql_begin() self.add(self.__class__.name.upper()) + def produce(self, _): pass def spawn(self, _): @@ -44,7 +47,7 @@ class ast_node: def consume(self, _): if self.parent is None: self.emit(self.sql+';\n') - + self.context.sql_end() from reconstruct.expr import expr, fastscan @@ -54,11 +57,17 @@ class projection(ast_node): first_order = 'select' def init(self, _): + # skip default init pass + def produce(self, node): p = node['select'] self.projections = p if type(p) is list else [p] self.add('SELECT') + if self.parent is None: + self.context.sql_begin() + self.postproc_fname = 'dll_' + base62uuid(6) + self.context.postproc_begin(self.postproc_fname) def spawn(self, node): self.datasource = None # datasource is Join instead of TableInfo @@ -245,6 +254,11 @@ class projection(ast_node): self.outfile.finalize() self.context.emitc(f'puts("done.");') + if self.parent is None: + self.context.sql_end() + self.context.postproc_end(self.postproc_fname) + + class orderby(ast_node): name = 'order by' def produce(self, node): @@ -314,15 +328,16 @@ class scan(ast_node): class groupby_c(ast_node): name = '_groupby' - def init(self, _): + def init(self, node : List[Tuple[expr, Set[ColRef]]]): self.proj : projection = self.parent - return super().init(_) + self.glist : List[Tuple[expr, Set[ColRef]]] = node + return super().init(node) def produce(self, node : List[Tuple[expr, Set[ColRef]]]): self.context.headers.add('"./server/hasher.h"') self.context.headers.add('unordered_map') self.group = 'g' + base62uuid(7) self.group_type = 'record_type' + base62uuid(7) - self.datasource = self.parent.datasource + self.datasource = self.proj.datasource self.scanner = None self.datasource.rec = set() @@ -330,7 +345,7 @@ class groupby_c(ast_node): g_contents_list = [] first_col = '' - for g in node: + for g in self.glist: e = expr(self, g[0].node, c_code=True) g_str = e.eval(c_code = True, y = lambda c: self.proj.pyname2cname[c]) # if v is compound expr, create tmp cols @@ -345,7 +360,7 @@ class groupby_c(ast_node): self.context.emitc(f'typedef record<{",".join(g_contents_decltype)}> {self.group_type};') self.context.emitc(f'unordered_map<{self.group_type}, vector_type, ' f'transTypes<{self.group_type}, hasher>> {self.group};') - self.n_grps = len(node) + self.n_grps = len(self.glist) self.scanner = scan(self, first_col + '.size') self.scanner.add(f'{self.group}[forward_as_tuple({g_contents}[{self.scanner.it_ver}])].emplace_back({self.scanner.it_ver});') @@ -372,9 +387,15 @@ class groupby_c(ast_node): if len_var is None: len_var = 'len_'+base62uuid(7) gscanner.add(f'auto &{len_var} = {val_var}.size;', position = 'front') - + + def get_key_idx (varname : str): + for i, g in enumerate(self.glist): + if varname == g[0].eval(): + return i + return var_table[varname] + def get_var_names (varname : str): - var = var_table[varname] + var = get_key_idx(varname) if type(var) is str: return f'{var}[{val_var}]' else: @@ -545,6 +566,8 @@ class create_table(ast_node): name = 'create_table' first_order = name def init(self, node): + if self.parent is None: + self.context.sql_begin() self.sql = 'CREATE TABLE ' def produce(self, node): @@ -558,10 +581,11 @@ class create_table(ast_node): self.sql += ')' if self.context.use_columnstore: self.sql += ' engine=ColumnStore' - + class insert(ast_node): name = 'insert' first_order = name + def produce(self, node): values = node['query']['select'] tbl = node['insert'] @@ -586,6 +610,8 @@ class load(ast_node): self.produce = self.produce_monetdb else: self.produce = self.produce_aq + if self.parent is None: + self.context.sql_begin() def produce_aq(self, node): node = node['load'] @@ -916,4 +942,4 @@ def include(objs): import sys -include(sys.modules[__name__]) \ No newline at end of file +include(sys.modules[__name__]) diff --git a/reconstruct/storage.py b/reconstruct/storage.py index dde8773..2c93f9e 100644 --- a/reconstruct/storage.py +++ b/reconstruct/storage.py @@ -87,7 +87,9 @@ class Context: self.finalized = False self.udf = None self.scans = [] - + self.procs = [] + self.queries = [] + def __init__(self): self.tables_byname = dict() self.col_byname = dict() @@ -101,11 +103,9 @@ class Context: self.has_dll = False self.dialect = 'MonetDB' self.have_hge = False + self.Error = lambda *args: print(*args) self.Info = lambda *_: None - self.Info = lambda *_: None - self.new() - def emit(self, sql:str): self.sql += sql + ' ' def emitc(self, c:str): @@ -118,17 +118,31 @@ class Context: self.emitc(str_scan) self.scans.remove(scan) - function_head = ''' - extern "C" int __DLLEXPORT__ dllmain(Context* cxt) { - using namespace std; - using namespace types; - auto server = static_cast(cxt->alt_server); - ''' + function_deco = '__AQEXPORT__(int) ' + function_head = ('(Context* cxt) {\n' + + '\tusing namespace std;\n' + + '\tusing namespace types;\n' + + '\tauto server = static_cast(cxt->alt_server);\n') + udf_head = ('#pragma once\n' '#include \"./server/libaquery.h\"\n' '#include \"./server/aggregations.h\"\n\n' ) + def sql_begin(self): + self.sql = '' + + def sql_end(self): + self.queries.append('Q' + self.sql) + self.sql = '' + def postproc_begin(self, proc_name: str): + self.ccode = self.function_deco + proc_name + self.function_head + + def postproc_end(self, proc_name: str): + self.procs.append(self.ccode + 'return 0;\n}') + self.ccode = '' + self.queries.append('P' + proc_name) + def finalize(self): if not self.finalized: headers = '' @@ -137,6 +151,6 @@ class Context: headers += '#include <' + h + '>\n' else: headers += '#include ' + h + '\n' - self.ccode = headers + self.function_head + self.ccode + 'return 0;\n}' + self.ccode = headers + '\n'.join(self.procs) self.headers = set() return self.ccode \ No newline at end of file diff --git a/server.exe b/server.exe deleted file mode 100644 index 8658fd9..0000000 Binary files a/server.exe and /dev/null differ diff --git a/server/server.cpp b/server/server.cpp index 8071c86..2f00a7d 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -85,16 +85,25 @@ int dll_main(int argc, char** argv, Context* cxt){ while(cfg->running){ if (cfg->new_query) { - + void *handle = 0; if (cfg->backend_type == BACKEND_MonetDB){ if (cxt->alt_server == 0) cxt->alt_server = new Server(cxt); Server* server = reinterpret_cast(cxt->alt_server); if(n_recv > 0){ + if (cfg->backend_type == BACKEND_AQuery || cfg->has_dll) { + handle = dlopen("./dll.so", RTLD_LAZY); + } for(int i = 0; i < n_recv; ++i) { - server->exec(n_recvd[i]); - printf("Exec Q%d: %s\n", i, n_recvd[i]); + if (n_recvd[i][0] == 'Q'){ + server->exec(n_recvd[i] + 1); + printf("Exec Q%d: %s\n", i, n_recvd[i]); + } + else if (n_recvd[i][0] == 'P' && handle) { + code_snippet c = reinterpret_cast(dlsym(handle, n_recvd[i]+1)); + c(cxt); + } } n_recv = 0; } @@ -108,12 +117,12 @@ int dll_main(int argc, char** argv, Context* cxt){ } // puts(cfg->has_dll ? "true" : "false"); - if (cfg->backend_type == BACKEND_AQuery || cfg->has_dll) { - void* handle = dlopen("./dll.so", RTLD_LAZY); + if (cfg->backend_type == BACKEND_AQuery) { + handle = dlopen("./dll.so", RTLD_LAZY); code_snippet c = reinterpret_cast(dlsym(handle, "dllmain")); c(cxt); - dlclose(handle); } + if (handle) dlclose(handle); cfg->new_query = 0; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); diff --git a/server/types.h b/server/types.h index 5c55030..0f5bb97 100644 --- a/server/types.h +++ b/server/types.h @@ -220,12 +220,13 @@ constexpr size_t sum_type(size_t a[], size_t sz) { ret += a[i]; return ret; } -template constexpr size_t sum_type() { +template +constexpr size_t sum_type() { size_t t[] = {std::is_same_v ...}; return sum_type(t, sizeof...(T1)); } -template constexpr -size_t count_type(std::tuple* ts) { +template +constexpr size_t count_type(std::tuple* ts) { size_t t[] = {sum_type() ...}; return sum_type(t, sizeof...(Types)); } diff --git a/server2.exp b/server2.exp deleted file mode 100644 index d6bdad4..0000000 Binary files a/server2.exp and /dev/null differ diff --git a/server2.lib b/server2.lib deleted file mode 100644 index cd028f3..0000000 Binary files a/server2.lib and /dev/null differ diff --git a/udf.hpp b/udf.hpp index a6d25af..271299d 100644 --- a/udf.hpp +++ b/udf.hpp @@ -2,17 +2,55 @@ #include "./server/libaquery.h" #include "./server/aggregations.h" -auto covariance = [](auto x, auto y) { - auto xmean = avg(x); - auto ymean = avg(y); - return avg(((x - xmean) * (y - ymean))); +auto covariances2 = [](auto x, auto y, auto w, uint32_t _builtin_len, auto& _builtin_ret) { + auto xmeans = 0.0; + auto ymeans = 0.0; + auto l = _builtin_len; + if((l > 0)) { + xmeans = x[0]; + ymeans = y[0]; + _builtin_ret[0] = 0.0; + } + if((w > l)) + w = l; + for(auto i = 1, j = 0; (i < w); i = (i + 1)) { + xmeans += x[i]; + ymeans += y[i]; + _builtin_ret[i] = avg(((x.subvec(0, i) - (xmeans / i)) * (y.subvec(0, i) - (ymeans / i)))); + } + xmeans /= w; + ymeans /= w; + for(auto i = w; (i < l); i += 1) { + xmeans += ((x[i] - x[(i - w)]) / w); + ymeans += ((y[i] - y[(i - w)]) / w); + _builtin_ret[i] = avg(((x.subvec((i - w), i) - xmeans) * (y.subvec((i - w), i) - ymeans))); + } + return ; }; -auto sd = [](auto x) { - return sqrt(covariance(x, x)); -}; - -auto paircorr = [](auto x, auto y) { - return (covariance(x, y) / (sd(x) * sd(y))); +auto covariances2_gettype = [](auto x, auto y, auto w) { + uint32_t _builtin_len = 0; + auto xmeans = 0.0; + auto ymeans = 0.0; + auto l = _builtin_len; + if((l > 0)) { + xmeans = x[0]; + ymeans = y[0]; + return 0.0; + } + if((w > l)) + w = l; + for(auto i = 1, j = 0; (i < w); i = (i + 1)) { + xmeans += x[i]; + ymeans += y[i]; + return avg(((x.subvec(0, i) - (xmeans / i)) * (y.subvec(0, i) - (ymeans / i)))); + } + xmeans /= w; + ymeans /= w; + for(auto i = w; (i < l); i += 1) { + xmeans += ((x[i] - x[(i - w)]) / w); + ymeans += ((y[i] - y[(i - w)]) / w); + return avg(((x.subvec((i - w), i) - xmeans) * (y.subvec((i - w), i) - ymeans))); + } };