From 2614d010daa59c866175e346d94aecd119ed3c4f Mon Sep 17 00:00:00 2001 From: Bill Date: Mon, 19 Sep 2022 17:22:46 +0800 Subject: [PATCH] imporved build driver, basic support for count() --- build.py | 57 ++++++++++++++++++++++++------------------ engine/types.py | 12 +++++++-- prompt.py | 4 +++ reconstruct/ast.py | 12 ++++++--- reconstruct/expr.py | 43 +++++++++++++++++++++++++++---- server/aggregations.h | 4 +-- server/server.cpp | 12 ++++----- test.aquery | Bin 459 -> 511 bytes tests/network.a | 2 +- 9 files changed, 102 insertions(+), 44 deletions(-) diff --git a/build.py b/build.py index b6d4006..3312a0b 100644 --- a/build.py +++ b/build.py @@ -21,7 +21,10 @@ class checksums: server = 'server.so' ): from platform import machine - self.env = aquery_config.os_platform + machine() + aquery_config.build_driver + self.env = (aquery_config.os_platform + + machine() + + aquery_config.build_driver + ) for key in self.__dict__.keys(): try: with open(eval(key), 'rb') as file: @@ -37,12 +40,13 @@ class checksums: except FileNotFoundError: print('missing component: ' + key) self.sources[key] = None + def __ne__(self, __o: 'checksums') -> 'checksums': ret = checksums() for key in self.__dict__.keys(): try: ret.__dict__[key] = ( - self.__dict__[key] and __o.__dict__[key] and + not (self.__dict__[key] and __o.__dict__[key]) or self.__dict__[key] != __o.__dict__[key] ) except KeyError: @@ -54,7 +58,7 @@ class checksums: for key in self.__dict__.keys(): try: ret.__dict__[key] = ( - not (self.__dict__[key] and __o.__dict__[key]) or + self.__dict__[key] and __o.__dict__[key] and self.__dict__[key] == __o.__dict__[key] ) except KeyError: @@ -82,16 +86,19 @@ class build_manager: self.mgr = mgr self.build_cmd = [] def libaquery_a(self) : - pass + return False def pch(self): - pass + return False def build(self, stdout = sys.stdout, stderr = sys.stderr): + ret = True for c in self.build_cmd: if c: try: - subprocess.call(c, stdout = stdout, stderr = stderr) + ret = subprocess.call(c, stdout = stdout, stderr = stderr) and ret except (FileNotFoundError): + ret = False pass + return ret class MakefileDriver(DriverBase): def __init__(self, mgr : 'build_manager') -> None: @@ -132,7 +139,7 @@ class build_manager: loc = os.path.abspath('./msc-plugin/libaquery.vcxproj') self.get_flags() self.build_cmd = [['del', 'libaquery.lib'], [aquery_config.msbuildroot, loc, self.opt, self.platform]] - self.build() + return self.build() def pch(self): pass @@ -141,24 +148,17 @@ class build_manager: loc = os.path.abspath('./msc-plugin/server.vcxproj') self.get_flags() self.build_cmd = [['del', 'server.so'], [aquery_config.msbuildroot, loc, self.opt, self.platform]] - self.build() + return self.build() def snippet(self): loc = os.path.abspath('./msc-plugin/msc-plugin.vcxproj') self.get_flags() self.build_cmd = [[aquery_config.msbuildroot, loc, self.opt, self.platform]] - self.build() + return self.build() #class PythonDriver(DriverBase): # def __init__(self, mgr : 'build_manager') -> None: - # super().__init__(mgr) - - #@property - #def MSBuild(self): - # return MSBuildDriver(self) - #@property - #def Makefile(self): - # return MakefileDriver(self) + # super().__init__(mgr) def __init__(self) -> None: self.method = 'make' @@ -181,7 +181,9 @@ class build_manager: def build_caches(self, force = False): cached = checksums() current = checksums() - libaquery_a = 'libaquery.lib' if aquery_config.os_platform else 'libaquery.a' + libaquery_a = 'libaquery.a' + if aquery_config.os_platform == 'win': + libaquery_a = 'libaquery.lib' current.calc(libaquery_a) try: with open('.cached', 'rb') as cache_sig: @@ -190,18 +192,25 @@ class build_manager: pass self.cache_status = current != cached + success = True if force or self.cache_status.sources: self.driver.pch() self.driver.libaquery_a() self.driver.server() else: if self.cache_status.libaquery_a: - self.driver.libaquery_a() + success = self.driver.libaquery_a() and success if self.cache_status.pch_hpp_gch: - self.driver.pch() + success = self.driver.pch() and success if self.cache_status.server: - self.driver.server() - current.calc(libaquery_a) - with open('.cached', 'wb') as cache_sig: - cache_sig.write(pickle.dumps(current)) + success = self.driver.server() and success + if success: + current.calc(libaquery_a) + with open('.cached', 'wb') as cache_sig: + cache_sig.write(pickle.dumps(current)) + else: + try: + os.remove('./.cached') + except: + pass diff --git a/engine/types.py b/engine/types.py index 477934d..d99fc56 100644 --- a/engine/types.py +++ b/engine/types.py @@ -1,5 +1,5 @@ from copy import deepcopy -from engine.utils import defval +from engine.utils import base62uuid, defval from aquery_config import have_hge from typing import Dict, List @@ -244,6 +244,14 @@ def fn_behavior(op:OperatorBase, c_code, *x): name = op.cname if c_code else op.sqlname return f'{name}({", ".join([f"{xx}" for xx in x])})' +def count_behavior(op:OperatorBase, c_code, x, distinct = False): + if not c_code: + return f'{op.sqlname}({"distinct " if distinct else ""}{x})' + elif distinct: + return 'count_'+base62uuid() + else: + return '{count()}' + def windowed_fn_behavor(op: OperatorBase, c_code, *x): if not c_code: return f'{op.sqlname}({", ".join([f"{xx}" for xx in x])})' @@ -282,7 +290,7 @@ fnmaxs = OperatorBase('maxs', [1, 2], ty_clamp(as_is, -1), cname = 'maxs', sqlna fnmins = OperatorBase('mins', [1, 2], ty_clamp(as_is, -1), cname = 'mins', sqlname = 'MINS', call = windowed_fn_behavor) fnsums = OperatorBase('sums', [1, 2], ext(ty_clamp(auto_extension, -1)), cname = 'sums', sqlname = 'SUMS', call = windowed_fn_behavor) fnavgs = OperatorBase('avgs', [1, 2], fp(ext(ty_clamp(auto_extension, -1))), cname = 'avgs', sqlname = 'AVGS', call = windowed_fn_behavor) -fncnt = OperatorBase('count', 1, int_return, cname = 'count', sqlname = 'COUNT', call = fn_behavior) +fncnt = OperatorBase('count', 1, int_return, cname = 'count', sqlname = 'COUNT', call = count_behavior) # special def is_null_call_behavior(op:OperatorBase, c_code : bool, x : str): if c_code : diff --git a/prompt.py b/prompt.py index aadbeb7..0f59d33 100644 --- a/prompt.py +++ b/prompt.py @@ -479,6 +479,10 @@ def prompt(running = lambda:True, next = lambda:input('> '), state = None): print(e) continue except (ValueError, FileNotFoundError) as e: + try: + os.remove('./cached') + except: + pass print(e) except (KeyboardInterrupt): break diff --git a/reconstruct/ast.py b/reconstruct/ast.py index 7d5b0c8..a549b03 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -223,12 +223,15 @@ class projection(ast_node): if type(val[1]) is str: x = True y = lambda t: self.pyname2cname[t] - val[1] = val[2].eval(x, y, gettype=True) + count = lambda : '0' + if vid2cname: + count = lambda : f'{vid2cname[0]}.size' + val[1] = val[2].eval(x, y, count=count) if callable(val[1]): - val[1] = val[1](True) - decltypestring = val[1] + val[1] = val[1](False) if val[0] == LazyT: + decltypestring = val[2].eval(x,y,gettype=True)(True) decltypestring = f'value_type>' out_typenames[key] = decltypestring else: @@ -461,7 +464,8 @@ class groupby_c(ast_node): return get_var_names(sql_code) else: return varex.eval(c_code=True, y = get_var_names, - materialize_builtin = materialize_builtin) + materialize_builtin = materialize_builtin, + count=lambda:f'{val_var}.size') for ce in cexprs: ex = ce[1] diff --git a/reconstruct/expr.py b/reconstruct/expr.py index b636667..ee9fcac 100644 --- a/reconstruct/expr.py +++ b/reconstruct/expr.py @@ -26,7 +26,13 @@ class expr(ast_node): def __init__(self, parent, node, *, c_code = None, supress_undefined = False): from reconstruct.ast import projection, udf - + # gen2 expr have multi-passes + # first pass parse json into expr tree + # generate target code in later passes upon need + self.children = [] + self.opname = '' + self.curr_code = '' + self.counts = {} self.type = None self.raw_col = None self.udf : Optional[udf] = None @@ -93,9 +99,15 @@ class expr(ast_node): self.is_agg_func = True op = self.operators[key] - + count_distinct = False + if key == 'count' and type(val) is dict and 'distinct' in val: + count_distinct = True + val = val['distinct'] val = enlist(val) exp_vals = [expr(self, v, c_code = self.c_code) for v in val] + self.children = exp_vals + self.opname = key + str_vals = [e.sql for e in exp_vals] type_vals = [e.type for e in exp_vals] is_compound = any([e.is_compound for e in exp_vals]) @@ -112,7 +124,11 @@ class expr(ast_node): pass self.type = AnyT - self.sql = op(self.c_code, *str_vals) + if count_distinct: # inject distinct col later + self.sql = f'{{{op(self.c_code, *str_vals, True)}}}' + else: + self.sql = op(self.c_code, *str_vals) + special_func = [*self.context.udf_map.keys(), *self.context.module_map.keys(), "maxs", "mins", "avgs", "sums", "deltas"] if self.context.special_gb: @@ -218,19 +234,23 @@ class expr(ast_node): self.sql = self.raw_col.name self.type = self.raw_col.type self.is_compound = True + self.opname = self.raw_col else: self.sql = node self.type = StrT + self.opname = node if self.c_code and self.datasource is not None: self.sql = f'{{y(\"{self.sql}\")}}' elif type(node) is bool: self.type = BoolT + self.opname = node if self.c_code: self.sql = '1' if node else '0' else: self.sql = 'TRUE' if node else 'FALSE' else: self.sql = f'{node}' + self.opname = node if type(node) is int: if (node >= 2**63 - 1 or node <= -2**63): self.type = LongT @@ -252,6 +272,12 @@ class expr(ast_node): self.codebuf += c.finalize(override=override) return self.codebuf + def codegen(self, delegate): + self.curr_code = '' + for c in self.children: + self.curr_code += c.codegen(delegate) + return self.curr_code + def __str__(self): return self.sql def __repr__(self): @@ -259,10 +285,17 @@ class expr(ast_node): # builtins is readonly, so it's okay to set default value as an object # eval is only called at root expr. - def eval(self, c_code = None, y = lambda t: t, materialize_builtin = False, _decltypestr = False, *, gettype = False): + def eval(self, c_code = None, y = lambda t: t, + materialize_builtin = False, _decltypestr = False, + count = lambda : 'count', var_inject = None, + *, + gettype = False): assert(self.is_root) def call(decltypestr = False) -> str: - nonlocal c_code, y, materialize_builtin + nonlocal c_code, y, materialize_builtin, count, var_inject + if var_inject: + for k, v in var_inject.items(): + locals()[k] = v if self.udf_called is not None: loc = locals() builtin_vars = self.udf_called.builtin_used diff --git a/server/aggregations.h b/server/aggregations.h index 0e41fc9..2add603 100644 --- a/server/aggregations.h +++ b/server/aggregations.h @@ -16,8 +16,8 @@ constexpr static inline size_t count(const T&) { return 1; } // TODO: Specializations for dt/str/none template class VT> -// types::GetLongType -LL_Type sum(const VT& v) { +types::GetLongType +sum(const VT& v) { types::GetLongType ret = 0; for (const auto& _v : v) ret += _v; diff --git a/server/server.cpp b/server/server.cpp index d128abc..5fec1d8 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -100,10 +100,10 @@ void Context::end_session(){ void* Context::get_module_function(const char* fname){ auto fmap = static_cast*> (this->module_function_maps); - printf("%p\n", fmap->find("mydiv")->second); - for (const auto& [key, value] : *fmap){ - printf("%s %p\n", key.c_str(), value); - } + // printf("%p\n", fmap->find("mydiv")->second); + // for (const auto& [key, value] : *fmap){ + // printf("%s %p\n", key.c_str(), value); + // } auto ret = fmap->find(fname); return ret == fmap->end() ? nullptr : ret->second; } @@ -188,9 +188,9 @@ int dll_main(int argc, char** argv, Context* cxt){ case 'F': // Register Function in Module { auto fname = n_recvd[i] + 1; - printf("F:: %s: %p, %p\n", fname, user_module_handle, dlsym(user_module_handle, fname)); + //printf("F:: %s: %p, %p\n", fname, user_module_handle, dlsym(user_module_handle, fname)); module_fn_map->insert_or_assign(fname, dlsym(user_module_handle, fname)); - printf("F::: %p\n", module_fn_map->find("mydiv") != module_fn_map->end() ? module_fn_map->find("mydiv")->second : nullptr); + //printf("F::: %p\n", module_fn_map->find("mydiv") != module_fn_map->end() ? module_fn_map->find("mydiv")->second : nullptr); } break; case 'U': // Unload Module diff --git a/test.aquery b/test.aquery index 84e4beaa465305d9eb1b2b9d6695c487a2ca289d..f9cd33c47647f494ffedf5a6104d1185297de7df 100644 GIT binary patch delta 55 zcmX@j{GWNlF~*7Wcy$tU3vyBw0!veiGE<9rxzZH!QcKG7i?a0+dATZ5D^imcc)57F KQY$h`c)0*Q9}-3Y delta 13 Ucmey*e42T~F-F#k%o1KM04Mwexc~qF diff --git a/tests/network.a b/tests/network.a index 78ecc48..922fec4 100644 --- a/tests/network.a +++ b/tests/network.a @@ -5,7 +5,7 @@ LOAD DATA INFILE "data/network.csv" INTO TABLE network FIELDS TERMINATED BY "," -SELECT src, dst, avg(len) +SELECT src, dst, count(*), avg(len) FROM network ASSUMING ASC src, ASC dst, ASC _time GROUP BY src, dst, sums (deltas(_time) > 120)