imporved build driver, basic support for count()

dev
Bill 2 years ago
parent 613941ce06
commit 2614d010da

@ -21,7 +21,10 @@ class checksums:
server = 'server.so' server = 'server.so'
): ):
from platform import machine from platform import machine
self.env = aquery_config.os_platform + machine() + aquery_config.build_driver self.env = (aquery_config.os_platform +
machine() +
aquery_config.build_driver
)
for key in self.__dict__.keys(): for key in self.__dict__.keys():
try: try:
with open(eval(key), 'rb') as file: with open(eval(key), 'rb') as file:
@ -37,12 +40,13 @@ class checksums:
except FileNotFoundError: except FileNotFoundError:
print('missing component: ' + key) print('missing component: ' + key)
self.sources[key] = None self.sources[key] = None
def __ne__(self, __o: 'checksums') -> 'checksums': def __ne__(self, __o: 'checksums') -> 'checksums':
ret = checksums() ret = checksums()
for key in self.__dict__.keys(): for key in self.__dict__.keys():
try: try:
ret.__dict__[key] = ( ret.__dict__[key] = (
self.__dict__[key] and __o.__dict__[key] and not (self.__dict__[key] and __o.__dict__[key]) or
self.__dict__[key] != __o.__dict__[key] self.__dict__[key] != __o.__dict__[key]
) )
except KeyError: except KeyError:
@ -54,7 +58,7 @@ class checksums:
for key in self.__dict__.keys(): for key in self.__dict__.keys():
try: try:
ret.__dict__[key] = ( ret.__dict__[key] = (
not (self.__dict__[key] and __o.__dict__[key]) or self.__dict__[key] and __o.__dict__[key] and
self.__dict__[key] == __o.__dict__[key] self.__dict__[key] == __o.__dict__[key]
) )
except KeyError: except KeyError:
@ -82,16 +86,19 @@ class build_manager:
self.mgr = mgr self.mgr = mgr
self.build_cmd = [] self.build_cmd = []
def libaquery_a(self) : def libaquery_a(self) :
pass return False
def pch(self): def pch(self):
pass return False
def build(self, stdout = sys.stdout, stderr = sys.stderr): def build(self, stdout = sys.stdout, stderr = sys.stderr):
ret = True
for c in self.build_cmd: for c in self.build_cmd:
if c: if c:
try: try:
subprocess.call(c, stdout = stdout, stderr = stderr) ret = subprocess.call(c, stdout = stdout, stderr = stderr) and ret
except (FileNotFoundError): except (FileNotFoundError):
ret = False
pass pass
return ret
class MakefileDriver(DriverBase): class MakefileDriver(DriverBase):
def __init__(self, mgr : 'build_manager') -> None: def __init__(self, mgr : 'build_manager') -> None:
@ -132,7 +139,7 @@ class build_manager:
loc = os.path.abspath('./msc-plugin/libaquery.vcxproj') loc = os.path.abspath('./msc-plugin/libaquery.vcxproj')
self.get_flags() self.get_flags()
self.build_cmd = [['del', 'libaquery.lib'], [aquery_config.msbuildroot, loc, self.opt, self.platform]] self.build_cmd = [['del', 'libaquery.lib'], [aquery_config.msbuildroot, loc, self.opt, self.platform]]
self.build() return self.build()
def pch(self): def pch(self):
pass pass
@ -141,24 +148,17 @@ class build_manager:
loc = os.path.abspath('./msc-plugin/server.vcxproj') loc = os.path.abspath('./msc-plugin/server.vcxproj')
self.get_flags() self.get_flags()
self.build_cmd = [['del', 'server.so'], [aquery_config.msbuildroot, loc, self.opt, self.platform]] self.build_cmd = [['del', 'server.so'], [aquery_config.msbuildroot, loc, self.opt, self.platform]]
self.build() return self.build()
def snippet(self): def snippet(self):
loc = os.path.abspath('./msc-plugin/msc-plugin.vcxproj') loc = os.path.abspath('./msc-plugin/msc-plugin.vcxproj')
self.get_flags() self.get_flags()
self.build_cmd = [[aquery_config.msbuildroot, loc, self.opt, self.platform]] self.build_cmd = [[aquery_config.msbuildroot, loc, self.opt, self.platform]]
self.build() return self.build()
#class PythonDriver(DriverBase): #class PythonDriver(DriverBase):
# def __init__(self, mgr : 'build_manager') -> None: # def __init__(self, mgr : 'build_manager') -> None:
# super().__init__(mgr) # super().__init__(mgr)
#@property
#def MSBuild(self):
# return MSBuildDriver(self)
#@property
#def Makefile(self):
# return MakefileDriver(self)
def __init__(self) -> None: def __init__(self) -> None:
self.method = 'make' self.method = 'make'
@ -181,7 +181,9 @@ class build_manager:
def build_caches(self, force = False): def build_caches(self, force = False):
cached = checksums() cached = checksums()
current = checksums() current = checksums()
libaquery_a = 'libaquery.lib' if aquery_config.os_platform else 'libaquery.a' libaquery_a = 'libaquery.a'
if aquery_config.os_platform == 'win':
libaquery_a = 'libaquery.lib'
current.calc(libaquery_a) current.calc(libaquery_a)
try: try:
with open('.cached', 'rb') as cache_sig: with open('.cached', 'rb') as cache_sig:
@ -190,18 +192,25 @@ class build_manager:
pass pass
self.cache_status = current != cached self.cache_status = current != cached
success = True
if force or self.cache_status.sources: if force or self.cache_status.sources:
self.driver.pch() self.driver.pch()
self.driver.libaquery_a() self.driver.libaquery_a()
self.driver.server() self.driver.server()
else: else:
if self.cache_status.libaquery_a: if self.cache_status.libaquery_a:
self.driver.libaquery_a() success = self.driver.libaquery_a() and success
if self.cache_status.pch_hpp_gch: if self.cache_status.pch_hpp_gch:
self.driver.pch() success = self.driver.pch() and success
if self.cache_status.server: if self.cache_status.server:
self.driver.server() success = self.driver.server() and success
current.calc(libaquery_a) if success:
with open('.cached', 'wb') as cache_sig: current.calc(libaquery_a)
cache_sig.write(pickle.dumps(current)) with open('.cached', 'wb') as cache_sig:
cache_sig.write(pickle.dumps(current))
else:
try:
os.remove('./.cached')
except:
pass

@ -1,5 +1,5 @@
from copy import deepcopy from copy import deepcopy
from engine.utils import defval from engine.utils import base62uuid, defval
from aquery_config import have_hge from aquery_config import have_hge
from typing import Dict, List from typing import Dict, List
@ -244,6 +244,14 @@ def fn_behavior(op:OperatorBase, c_code, *x):
name = op.cname if c_code else op.sqlname name = op.cname if c_code else op.sqlname
return f'{name}({", ".join([f"{xx}" for xx in x])})' return f'{name}({", ".join([f"{xx}" for xx in x])})'
def count_behavior(op:OperatorBase, c_code, x, distinct = False):
if not c_code:
return f'{op.sqlname}({"distinct " if distinct else ""}{x})'
elif distinct:
return 'count_'+base62uuid()
else:
return '{count()}'
def windowed_fn_behavor(op: OperatorBase, c_code, *x): def windowed_fn_behavor(op: OperatorBase, c_code, *x):
if not c_code: if not c_code:
return f'{op.sqlname}({", ".join([f"{xx}" for xx in x])})' return f'{op.sqlname}({", ".join([f"{xx}" for xx in x])})'
@ -282,7 +290,7 @@ fnmaxs = OperatorBase('maxs', [1, 2], ty_clamp(as_is, -1), cname = 'maxs', sqlna
fnmins = OperatorBase('mins', [1, 2], ty_clamp(as_is, -1), cname = 'mins', sqlname = 'MINS', call = windowed_fn_behavor) fnmins = OperatorBase('mins', [1, 2], ty_clamp(as_is, -1), cname = 'mins', sqlname = 'MINS', call = windowed_fn_behavor)
fnsums = OperatorBase('sums', [1, 2], ext(ty_clamp(auto_extension, -1)), cname = 'sums', sqlname = 'SUMS', call = windowed_fn_behavor) fnsums = OperatorBase('sums', [1, 2], ext(ty_clamp(auto_extension, -1)), cname = 'sums', sqlname = 'SUMS', call = windowed_fn_behavor)
fnavgs = OperatorBase('avgs', [1, 2], fp(ext(ty_clamp(auto_extension, -1))), cname = 'avgs', sqlname = 'AVGS', call = windowed_fn_behavor) fnavgs = OperatorBase('avgs', [1, 2], fp(ext(ty_clamp(auto_extension, -1))), cname = 'avgs', sqlname = 'AVGS', call = windowed_fn_behavor)
fncnt = OperatorBase('count', 1, int_return, cname = 'count', sqlname = 'COUNT', call = fn_behavior) fncnt = OperatorBase('count', 1, int_return, cname = 'count', sqlname = 'COUNT', call = count_behavior)
# special # special
def is_null_call_behavior(op:OperatorBase, c_code : bool, x : str): def is_null_call_behavior(op:OperatorBase, c_code : bool, x : str):
if c_code : if c_code :

@ -479,6 +479,10 @@ def prompt(running = lambda:True, next = lambda:input('> '), state = None):
print(e) print(e)
continue continue
except (ValueError, FileNotFoundError) as e: except (ValueError, FileNotFoundError) as e:
try:
os.remove('./cached')
except:
pass
print(e) print(e)
except (KeyboardInterrupt): except (KeyboardInterrupt):
break break

@ -223,12 +223,15 @@ class projection(ast_node):
if type(val[1]) is str: if type(val[1]) is str:
x = True x = True
y = lambda t: self.pyname2cname[t] y = lambda t: self.pyname2cname[t]
val[1] = val[2].eval(x, y, gettype=True) count = lambda : '0'
if vid2cname:
count = lambda : f'{vid2cname[0]}.size'
val[1] = val[2].eval(x, y, count=count)
if callable(val[1]): if callable(val[1]):
val[1] = val[1](True) val[1] = val[1](False)
decltypestring = val[1]
if val[0] == LazyT: if val[0] == LazyT:
decltypestring = val[2].eval(x,y,gettype=True)(True)
decltypestring = f'value_type<decays<decltype({decltypestring})>>' decltypestring = f'value_type<decays<decltype({decltypestring})>>'
out_typenames[key] = decltypestring out_typenames[key] = decltypestring
else: else:
@ -461,7 +464,8 @@ class groupby_c(ast_node):
return get_var_names(sql_code) return get_var_names(sql_code)
else: else:
return varex.eval(c_code=True, y = get_var_names, return varex.eval(c_code=True, y = get_var_names,
materialize_builtin = materialize_builtin) materialize_builtin = materialize_builtin,
count=lambda:f'{val_var}.size')
for ce in cexprs: for ce in cexprs:
ex = ce[1] ex = ce[1]

@ -26,7 +26,13 @@ class expr(ast_node):
def __init__(self, parent, node, *, c_code = None, supress_undefined = False): def __init__(self, parent, node, *, c_code = None, supress_undefined = False):
from reconstruct.ast import projection, udf from reconstruct.ast import projection, udf
# gen2 expr have multi-passes
# first pass parse json into expr tree
# generate target code in later passes upon need
self.children = []
self.opname = ''
self.curr_code = ''
self.counts = {}
self.type = None self.type = None
self.raw_col = None self.raw_col = None
self.udf : Optional[udf] = None self.udf : Optional[udf] = None
@ -93,9 +99,15 @@ class expr(ast_node):
self.is_agg_func = True self.is_agg_func = True
op = self.operators[key] op = self.operators[key]
count_distinct = False
if key == 'count' and type(val) is dict and 'distinct' in val:
count_distinct = True
val = val['distinct']
val = enlist(val) val = enlist(val)
exp_vals = [expr(self, v, c_code = self.c_code) for v in val] exp_vals = [expr(self, v, c_code = self.c_code) for v in val]
self.children = exp_vals
self.opname = key
str_vals = [e.sql for e in exp_vals] str_vals = [e.sql for e in exp_vals]
type_vals = [e.type for e in exp_vals] type_vals = [e.type for e in exp_vals]
is_compound = any([e.is_compound for e in exp_vals]) is_compound = any([e.is_compound for e in exp_vals])
@ -112,7 +124,11 @@ class expr(ast_node):
pass pass
self.type = AnyT self.type = AnyT
self.sql = op(self.c_code, *str_vals) if count_distinct: # inject distinct col later
self.sql = f'{{{op(self.c_code, *str_vals, True)}}}'
else:
self.sql = op(self.c_code, *str_vals)
special_func = [*self.context.udf_map.keys(), *self.context.module_map.keys(), special_func = [*self.context.udf_map.keys(), *self.context.module_map.keys(),
"maxs", "mins", "avgs", "sums", "deltas"] "maxs", "mins", "avgs", "sums", "deltas"]
if self.context.special_gb: if self.context.special_gb:
@ -218,19 +234,23 @@ class expr(ast_node):
self.sql = self.raw_col.name self.sql = self.raw_col.name
self.type = self.raw_col.type self.type = self.raw_col.type
self.is_compound = True self.is_compound = True
self.opname = self.raw_col
else: else:
self.sql = node self.sql = node
self.type = StrT self.type = StrT
self.opname = node
if self.c_code and self.datasource is not None: if self.c_code and self.datasource is not None:
self.sql = f'{{y(\"{self.sql}\")}}' self.sql = f'{{y(\"{self.sql}\")}}'
elif type(node) is bool: elif type(node) is bool:
self.type = BoolT self.type = BoolT
self.opname = node
if self.c_code: if self.c_code:
self.sql = '1' if node else '0' self.sql = '1' if node else '0'
else: else:
self.sql = 'TRUE' if node else 'FALSE' self.sql = 'TRUE' if node else 'FALSE'
else: else:
self.sql = f'{node}' self.sql = f'{node}'
self.opname = node
if type(node) is int: if type(node) is int:
if (node >= 2**63 - 1 or node <= -2**63): if (node >= 2**63 - 1 or node <= -2**63):
self.type = LongT self.type = LongT
@ -252,6 +272,12 @@ class expr(ast_node):
self.codebuf += c.finalize(override=override) self.codebuf += c.finalize(override=override)
return self.codebuf return self.codebuf
def codegen(self, delegate):
self.curr_code = ''
for c in self.children:
self.curr_code += c.codegen(delegate)
return self.curr_code
def __str__(self): def __str__(self):
return self.sql return self.sql
def __repr__(self): def __repr__(self):
@ -259,10 +285,17 @@ class expr(ast_node):
# builtins is readonly, so it's okay to set default value as an object # builtins is readonly, so it's okay to set default value as an object
# eval is only called at root expr. # eval is only called at root expr.
def eval(self, c_code = None, y = lambda t: t, materialize_builtin = False, _decltypestr = False, *, gettype = False): def eval(self, c_code = None, y = lambda t: t,
materialize_builtin = False, _decltypestr = False,
count = lambda : 'count', var_inject = None,
*,
gettype = False):
assert(self.is_root) assert(self.is_root)
def call(decltypestr = False) -> str: def call(decltypestr = False) -> str:
nonlocal c_code, y, materialize_builtin nonlocal c_code, y, materialize_builtin, count, var_inject
if var_inject:
for k, v in var_inject.items():
locals()[k] = v
if self.udf_called is not None: if self.udf_called is not None:
loc = locals() loc = locals()
builtin_vars = self.udf_called.builtin_used builtin_vars = self.udf_called.builtin_used

@ -16,8 +16,8 @@ constexpr static inline size_t count(const T&) { return 1; }
// TODO: Specializations for dt/str/none // TODO: Specializations for dt/str/none
template<class T, template<typename ...> class VT> template<class T, template<typename ...> class VT>
// types::GetLongType<T> types::GetLongType<T>
LL_Type sum(const VT<T>& v) { sum(const VT<T>& v) {
types::GetLongType<T> ret = 0; types::GetLongType<T> ret = 0;
for (const auto& _v : v) for (const auto& _v : v)
ret += _v; ret += _v;

@ -100,10 +100,10 @@ void Context::end_session(){
void* Context::get_module_function(const char* fname){ void* Context::get_module_function(const char* fname){
auto fmap = static_cast<std::unordered_map<std::string, void*>*> auto fmap = static_cast<std::unordered_map<std::string, void*>*>
(this->module_function_maps); (this->module_function_maps);
printf("%p\n", fmap->find("mydiv")->second); // printf("%p\n", fmap->find("mydiv")->second);
for (const auto& [key, value] : *fmap){ // for (const auto& [key, value] : *fmap){
printf("%s %p\n", key.c_str(), value); // printf("%s %p\n", key.c_str(), value);
} // }
auto ret = fmap->find(fname); auto ret = fmap->find(fname);
return ret == fmap->end() ? nullptr : ret->second; return ret == fmap->end() ? nullptr : ret->second;
} }
@ -188,9 +188,9 @@ int dll_main(int argc, char** argv, Context* cxt){
case 'F': // Register Function in Module case 'F': // Register Function in Module
{ {
auto fname = n_recvd[i] + 1; auto fname = n_recvd[i] + 1;
printf("F:: %s: %p, %p\n", fname, user_module_handle, dlsym(user_module_handle, fname)); //printf("F:: %s: %p, %p\n", fname, user_module_handle, dlsym(user_module_handle, fname));
module_fn_map->insert_or_assign(fname, dlsym(user_module_handle, fname)); module_fn_map->insert_or_assign(fname, dlsym(user_module_handle, fname));
printf("F::: %p\n", module_fn_map->find("mydiv") != module_fn_map->end() ? module_fn_map->find("mydiv")->second : nullptr); //printf("F::: %p\n", module_fn_map->find("mydiv") != module_fn_map->end() ? module_fn_map->find("mydiv")->second : nullptr);
} }
break; break;
case 'U': // Unload Module case 'U': // Unload Module

Binary file not shown.

@ -5,7 +5,7 @@ LOAD DATA INFILE "data/network.csv"
INTO TABLE network INTO TABLE network
FIELDS TERMINATED BY "," FIELDS TERMINATED BY ","
SELECT src, dst, avg(len) SELECT src, dst, count(*), avg(len)
FROM network FROM network
ASSUMING ASC src, ASC dst, ASC _time ASSUMING ASC src, ASC dst, ASC _time
GROUP BY src, dst, sums (deltas(_time) > 120) GROUP BY src, dst, sums (deltas(_time) > 120)

Loading…
Cancel
Save