diff --git a/docs/paper b/docs/paper index 88d6445..ed2b7b3 160000 --- a/docs/paper +++ b/docs/paper @@ -1 +1 @@ -Subproject commit 88d64456343506aae785a938c295864572fe6c0b +Subproject commit ed2b7b350f162271472feb40dd02f5c885f51683 diff --git a/engine/ast.py b/engine/ast.py index 17c400e..b666048 100644 --- a/engine/ast.py +++ b/engine/ast.py @@ -61,6 +61,7 @@ class SubqType(Enum): GROUPBY = auto() ORDERBY = auto() NONE = auto() + class projection(ast_node): name = 'projection' first_order = 'select' @@ -87,6 +88,7 @@ class projection(ast_node): def produce(self, node): self.add('SELECT') + self.has_preproc = not self.context.use_cached_tables self.has_postproc = 'into' in node if 'select' in node: p = node['select'] @@ -130,10 +132,17 @@ class projection(ast_node): self.assumptions = enlist(node['from']['assumptions']) if self.datasource is not None: + self.has_preproc = ( + self.has_preproc and + not self.datasource.no_join and + not self.datasource.tables[0].cached + ) self.datasource_changed = True self.prev_datasource = self.context.datasource self.context.datasource = self.datasource - + else: + self.has_preproc = False + if 'where' in node: self.where = filter(self, node['where']) else: @@ -329,11 +338,33 @@ class projection(ast_node): typenames = [c[1] for c in col_exprs] + [c.type for c in self.col_ext] length_name = 'len_' + base62uuid(6) self.context.emitc(f'auto {length_name} = server->cnt;') + + self.input_table_name = 'placeholder' + self.input_table_type = 'auto' + if not self.has_preproc: + obj_input_table : TableInfo = self.datasource.tables[0] + self.input_table_name = ( + 'tbl_' + + obj_input_table.table_name + + '_' + base62uuid(4) + ) + col_types = [c.type.cname for c in obj_input_table.columns] + self.input_table_type = f'TableInfo<{", ".join(col_types)}>' + self.context.emitc(f'{self.input_table_type}* {self.input_table_name}' + f' = cxt->tables["{obj_input_table.table_name}"];') for v, idx in self.var_table.items(): vname = get_legal_name(v) + '_' + base62uuid(3) self.pyname2cname[v] = vname - self.context.emitc(f'auto {vname} = ColRef<{typenames[idx].cname}>({length_name}, server->getCol({idx}, {typenames[idx].ctype_name}));') + if not self.has_preproc: + # TODO: verify/ensure that idx is the same as cid in the table. + self.context.emitc(f'decltype(auto) {vname} = ' + f'{self.input_table_name}->get_col<{idx}>();') + else: + self.context.emitc(f'auto {vname} = ColRef<{typenames[idx].cname}>(' + f'{length_name}, ' + f'server->getCol({idx}, {typenames[idx].ctype_name})' + f');') vid2cname[idx] = vname # Create table into context out_typenames = [None] * len(proj_map) @@ -505,10 +536,21 @@ class scan(ast_node): class LoopStyle(Enum): forloop = auto() foreach = auto() + class LoopType(Enum): + continous = auto() + descrete = auto() name = 'scan' - def __init__(self, parent: "ast_node", node, loop_style = 'for', context: Context = None, const = False, it_name = None): + def __init__(self, parent: "ast_node", + node, + loop_style = LoopStyle.forloop, + context: Context = None, + const = False, + it_name = None, + it_type = LoopType.continous + ): self.it_var = it_name + self.it_type = it_type self.const = "const " if const else "" self.loop_style = loop_style super().__init__(parent, node, context) @@ -519,27 +561,56 @@ class scan(ast_node): self.initializers = '' self.start = '' self.front = '' - self.body = '' - self.end = '}' + self.body = [] + self.end = '' self.parent.context.scans.append(self) def produce(self, node): - if self.loop_style == 'for_each': + if self.loop_style == scan.LoopStyle.foreach: self.colref = node self.start += f'for ({self.const}auto& {self.it_var} : {node}) {{\n' else: self.start += f"for (uint32_t {self.it_var} = 0; {self.it_var} < {node}; ++{self.it_var}){{\n" - def add(self, stmt, position = "body"): - if position == "body": - self.body += stmt + '\n' - elif position == "init": + def add(self, stmt, position = Position.body): + if position == scan.Position.body: + self.body.append(stmt) + elif position == scan.Position.init: self.initializers += stmt + '\n' - else: + elif position == scan.Position.front: self.front += stmt + '\n' + elif position == scan.Position.fin: + self.end += stmt + '\n' + elif position == scan.Position.back: + self.end = stmt + '\n' + self.end + else: + raise ValueError(f'Unknown position {position}') def finalize(self): - self.context.remove_scan(self, self.initializers + self.start + self.front + self.body + self.end) + scan_assembly = '' + if self.it_type == scan.LoopType.descrete: + scan_assembly = ( + self.initializers + + '\n'.join([( + self.start + + self.front + + b + + '}' + ) for b in self.body]) + + + self.end + ) + else: + scan_assembly = ( + self.initializers + + self.start + + self.front + + '\n'.join(self.body) + + '}' + + self.end + ) + self.context.remove_scan(self, scan_assembly) + class groupby_c(ast_node): name = '_groupby' @@ -631,21 +702,22 @@ class groupby_c(ast_node): preproc_scanner.finalize() self.context.emitc('GC::scratch_space = GC::gc_handle ? &(GC::gc_handle->scratch) : nullptr;') - # gscanner = scan(self, self.group, loop_style = 'for_each') + # gscanner = scan(self, self.group, loop_style = scan.LoopStyle.foreach) gscanner = scan(self, self.arr_len) key_var = 'key_'+base62uuid(7) val_var = 'val_'+base62uuid(7) # gscanner.add(f'auto &{key_var} = {gscanner.it_var}.first;', position = 'front') # gscanner.add(f'auto &{val_var} = {gscanner.it_var}.second;', position = 'front') - gscanner.add(f'auto &{key_var} = {self.arr_values}[{gscanner.it_var}];', position = 'front') - gscanner.add(f'auto &{val_var} = {self.vecs}[{gscanner.it_var}];', position = 'front') + gscanner.add(f'auto &{key_var} = {self.arr_values}[{gscanner.it_var}];', position = scan.Position.front) + gscanner.add(f'auto &{val_var} = {self.vecs}[{gscanner.it_var}];', position = scan.Position.front) + len_var = None len_var = None def define_len_var(): nonlocal len_var if len_var is None: len_var = 'len_'+base62uuid(7) - gscanner.add(f'auto &{len_var} = {val_var}.size;', position = 'front') + gscanner.add(f'auto &{len_var} = {val_var}.size;', position = scan.Position.front) def get_key_idx (varname : str): ex = expr(self, varname) @@ -782,7 +854,7 @@ class groupby(ast_node): self.dedicated_gb = groupby_c(self.parent, self.dedicated_glist) self.dedicated_gb.finalize(cexprs, var_table, col_names, col_types, col_tovec) - +# TODO: add support for CallExpr for tables. class join(ast_node): name = 'join' @@ -801,9 +873,12 @@ class join(ast_node): if j != c and j in stripped: stripped.remove(j) return stripped + @property + def no_join(self): + return len(self.tables) == 1 def init(self, _): - self.joins : List[join] = [] + self.joins : List[Tuple[str, bool]] = [] self.tables : List[TableInfo] = [] self.tables_dir = dict() self.rec = None @@ -817,19 +892,22 @@ class join(ast_node): def append(self, tbls, __alias = ''): alias = lambda t : t + ' ' + __alias if len(__alias) else t + # sub joins if type(tbls) is join: self.joins.append((alias(tbls.__str__()), tbls.have_sep)) self.tables += tbls.tables self.tables_dir = {**self.tables_dir, **tbls.tables_dir} self.join_conditions += tbls.join_conditions - + + # simple table elif type(tbls) is TableInfo: self.joins.append((alias(tbls.table_name), False)) self.tables.append(tbls) self.tables_dir[tbls.table_name] = tbls for a in tbls.alias: self.tables_dir[a] = tbls - + + # subquery elif type(tbls) is projection: self.joins.append((alias(tbls.finalize()), False)) @@ -1764,9 +1842,11 @@ class cache(ast_node): for t in tbl.columns: schema_string += t.name + '\0' + \ encode_integral(aquery_types[t.type.ctype_name]) - + # TODO: 1. deal with hsituation where the table has aliases + # TODO: 2. when table is already cached. use write-back/lazy/eager to deal with changes from common.utils import send_to_server send_to_server(f'C{source}\0{"l" if lazy else "e"}\0{schema_string}\0') + tbl.cached = True def include(objs): import inspect diff --git a/server/hasher.h b/server/hasher.h index 00eb80a..066fab3 100644 --- a/server/hasher.h +++ b/server/hasher.h @@ -162,7 +162,9 @@ struct PerfectHashTable { // } template class VT> - void construct(VT&... args) { + // std::enable_if_t, void> + void + construct(VT&... args) { // construct a hash set ((this->n_cols = args.size), ...); static_assert( (sizeof...(Types) < PerfectHashingThreshold) && @@ -198,6 +200,6 @@ struct PerfectHashTable { // problem: random memory access } // delete[] hash_values; - free(hash_values); + free(hash_values); // dispatch to gc } }; diff --git a/server/vector_type.hpp b/server/vector_type.hpp index d7704ad..4630200 100644 --- a/server/vector_type.hpp +++ b/server/vector_type.hpp @@ -505,9 +505,16 @@ public: ht_base = static_cast(calloc(sz, sizeof(uint32_t))); } + template + inline void hashtable_push_all(Keys_t& ... keys, uint32_t len) { + for(uint32_t i = 0; i < len; ++i) + reversemap[i] = ankerl::unordered_dense::set::hashtable_push(keys[i]...); + for(uint32_t i = 0; i < len; ++i) + ++ht_base[reversemap[i]]; + } inline void hashtable_push(Key&& k, uint32_t i){ - reversemap[i] = ankerl::unordered_dense::set::hashtable_push(std::move(k)); - ++ht_base[reversemap[i]]; + reversemap[i] = ankerl::unordered_dense::set::hashtable_push(k); + ++ht_base[reversemap[i]]; // do this seperately? } auto ht_postproc(uint32_t sz) {