General Hashtable optimization

Perfect Hash tabel initial implementation
Table caching
Bug fixesw
master
bill 2 years ago
parent d0f0b4fc56
commit f9205dc2a6

@ -1 +1 @@
Subproject commit 88d64456343506aae785a938c295864572fe6c0b Subproject commit ed2b7b350f162271472feb40dd02f5c885f51683

@ -61,6 +61,7 @@ class SubqType(Enum):
GROUPBY = auto() GROUPBY = auto()
ORDERBY = auto() ORDERBY = auto()
NONE = auto() NONE = auto()
class projection(ast_node): class projection(ast_node):
name = 'projection' name = 'projection'
first_order = 'select' first_order = 'select'
@ -87,6 +88,7 @@ class projection(ast_node):
def produce(self, node): def produce(self, node):
self.add('SELECT') self.add('SELECT')
self.has_preproc = not self.context.use_cached_tables
self.has_postproc = 'into' in node self.has_postproc = 'into' in node
if 'select' in node: if 'select' in node:
p = node['select'] p = node['select']
@ -130,10 +132,17 @@ class projection(ast_node):
self.assumptions = enlist(node['from']['assumptions']) self.assumptions = enlist(node['from']['assumptions'])
if self.datasource is not None: if self.datasource is not None:
self.has_preproc = (
self.has_preproc and
not self.datasource.no_join and
not self.datasource.tables[0].cached
)
self.datasource_changed = True self.datasource_changed = True
self.prev_datasource = self.context.datasource self.prev_datasource = self.context.datasource
self.context.datasource = self.datasource self.context.datasource = self.datasource
else:
self.has_preproc = False
if 'where' in node: if 'where' in node:
self.where = filter(self, node['where']) self.where = filter(self, node['where'])
else: else:
@ -329,11 +338,33 @@ class projection(ast_node):
typenames = [c[1] for c in col_exprs] + [c.type for c in self.col_ext] typenames = [c[1] for c in col_exprs] + [c.type for c in self.col_ext]
length_name = 'len_' + base62uuid(6) length_name = 'len_' + base62uuid(6)
self.context.emitc(f'auto {length_name} = server->cnt;') self.context.emitc(f'auto {length_name} = server->cnt;')
self.input_table_name = 'placeholder'
self.input_table_type = 'auto'
if not self.has_preproc:
obj_input_table : TableInfo = self.datasource.tables[0]
self.input_table_name = (
'tbl_' +
obj_input_table.table_name +
'_' + base62uuid(4)
)
col_types = [c.type.cname for c in obj_input_table.columns]
self.input_table_type = f'TableInfo<{", ".join(col_types)}>'
self.context.emitc(f'{self.input_table_type}* {self.input_table_name}'
f' = cxt->tables["{obj_input_table.table_name}"];')
for v, idx in self.var_table.items(): for v, idx in self.var_table.items():
vname = get_legal_name(v) + '_' + base62uuid(3) vname = get_legal_name(v) + '_' + base62uuid(3)
self.pyname2cname[v] = vname self.pyname2cname[v] = vname
self.context.emitc(f'auto {vname} = ColRef<{typenames[idx].cname}>({length_name}, server->getCol({idx}, {typenames[idx].ctype_name}));') if not self.has_preproc:
# TODO: verify/ensure that idx is the same as cid in the table.
self.context.emitc(f'decltype(auto) {vname} = '
f'{self.input_table_name}->get_col<{idx}>();')
else:
self.context.emitc(f'auto {vname} = ColRef<{typenames[idx].cname}>('
f'{length_name}, '
f'server->getCol({idx}, {typenames[idx].ctype_name})'
f');')
vid2cname[idx] = vname vid2cname[idx] = vname
# Create table into context # Create table into context
out_typenames = [None] * len(proj_map) out_typenames = [None] * len(proj_map)
@ -505,10 +536,21 @@ class scan(ast_node):
class LoopStyle(Enum): class LoopStyle(Enum):
forloop = auto() forloop = auto()
foreach = auto() foreach = auto()
class LoopType(Enum):
continous = auto()
descrete = auto()
name = 'scan' name = 'scan'
def __init__(self, parent: "ast_node", node, loop_style = 'for', context: Context = None, const = False, it_name = None): def __init__(self, parent: "ast_node",
node,
loop_style = LoopStyle.forloop,
context: Context = None,
const = False,
it_name = None,
it_type = LoopType.continous
):
self.it_var = it_name self.it_var = it_name
self.it_type = it_type
self.const = "const " if const else "" self.const = "const " if const else ""
self.loop_style = loop_style self.loop_style = loop_style
super().__init__(parent, node, context) super().__init__(parent, node, context)
@ -519,27 +561,56 @@ class scan(ast_node):
self.initializers = '' self.initializers = ''
self.start = '' self.start = ''
self.front = '' self.front = ''
self.body = '' self.body = []
self.end = '}' self.end = ''
self.parent.context.scans.append(self) self.parent.context.scans.append(self)
def produce(self, node): def produce(self, node):
if self.loop_style == 'for_each': if self.loop_style == scan.LoopStyle.foreach:
self.colref = node self.colref = node
self.start += f'for ({self.const}auto& {self.it_var} : {node}) {{\n' self.start += f'for ({self.const}auto& {self.it_var} : {node}) {{\n'
else: else:
self.start += f"for (uint32_t {self.it_var} = 0; {self.it_var} < {node}; ++{self.it_var}){{\n" self.start += f"for (uint32_t {self.it_var} = 0; {self.it_var} < {node}; ++{self.it_var}){{\n"
def add(self, stmt, position = "body"): def add(self, stmt, position = Position.body):
if position == "body": if position == scan.Position.body:
self.body += stmt + '\n' self.body.append(stmt)
elif position == "init": elif position == scan.Position.init:
self.initializers += stmt + '\n' self.initializers += stmt + '\n'
else: elif position == scan.Position.front:
self.front += stmt + '\n' self.front += stmt + '\n'
elif position == scan.Position.fin:
self.end += stmt + '\n'
elif position == scan.Position.back:
self.end = stmt + '\n' + self.end
else:
raise ValueError(f'Unknown position {position}')
def finalize(self): def finalize(self):
self.context.remove_scan(self, self.initializers + self.start + self.front + self.body + self.end) scan_assembly = ''
if self.it_type == scan.LoopType.descrete:
scan_assembly = (
self.initializers +
'\n'.join([(
self.start +
self.front +
b +
'}'
) for b in self.body])
+
self.end
)
else:
scan_assembly = (
self.initializers +
self.start +
self.front +
'\n'.join(self.body) +
'}' +
self.end
)
self.context.remove_scan(self, scan_assembly)
class groupby_c(ast_node): class groupby_c(ast_node):
name = '_groupby' name = '_groupby'
@ -631,21 +702,22 @@ class groupby_c(ast_node):
preproc_scanner.finalize() preproc_scanner.finalize()
self.context.emitc('GC::scratch_space = GC::gc_handle ? &(GC::gc_handle->scratch) : nullptr;') self.context.emitc('GC::scratch_space = GC::gc_handle ? &(GC::gc_handle->scratch) : nullptr;')
# gscanner = scan(self, self.group, loop_style = 'for_each') # gscanner = scan(self, self.group, loop_style = scan.LoopStyle.foreach)
gscanner = scan(self, self.arr_len) gscanner = scan(self, self.arr_len)
key_var = 'key_'+base62uuid(7) key_var = 'key_'+base62uuid(7)
val_var = 'val_'+base62uuid(7) val_var = 'val_'+base62uuid(7)
# gscanner.add(f'auto &{key_var} = {gscanner.it_var}.first;', position = 'front') # gscanner.add(f'auto &{key_var} = {gscanner.it_var}.first;', position = 'front')
# gscanner.add(f'auto &{val_var} = {gscanner.it_var}.second;', position = 'front') # gscanner.add(f'auto &{val_var} = {gscanner.it_var}.second;', position = 'front')
gscanner.add(f'auto &{key_var} = {self.arr_values}[{gscanner.it_var}];', position = 'front') gscanner.add(f'auto &{key_var} = {self.arr_values}[{gscanner.it_var}];', position = scan.Position.front)
gscanner.add(f'auto &{val_var} = {self.vecs}[{gscanner.it_var}];', position = 'front') gscanner.add(f'auto &{val_var} = {self.vecs}[{gscanner.it_var}];', position = scan.Position.front)
len_var = None
len_var = None len_var = None
def define_len_var(): def define_len_var():
nonlocal len_var nonlocal len_var
if len_var is None: if len_var is None:
len_var = 'len_'+base62uuid(7) len_var = 'len_'+base62uuid(7)
gscanner.add(f'auto &{len_var} = {val_var}.size;', position = 'front') gscanner.add(f'auto &{len_var} = {val_var}.size;', position = scan.Position.front)
def get_key_idx (varname : str): def get_key_idx (varname : str):
ex = expr(self, varname) ex = expr(self, varname)
@ -782,7 +854,7 @@ class groupby(ast_node):
self.dedicated_gb = groupby_c(self.parent, self.dedicated_glist) self.dedicated_gb = groupby_c(self.parent, self.dedicated_glist)
self.dedicated_gb.finalize(cexprs, var_table, col_names, col_types, col_tovec) self.dedicated_gb.finalize(cexprs, var_table, col_names, col_types, col_tovec)
# TODO: add support for CallExpr for tables.
class join(ast_node): class join(ast_node):
name = 'join' name = 'join'
@ -801,9 +873,12 @@ class join(ast_node):
if j != c and j in stripped: if j != c and j in stripped:
stripped.remove(j) stripped.remove(j)
return stripped return stripped
@property
def no_join(self):
return len(self.tables) == 1
def init(self, _): def init(self, _):
self.joins : List[join] = [] self.joins : List[Tuple[str, bool]] = []
self.tables : List[TableInfo] = [] self.tables : List[TableInfo] = []
self.tables_dir = dict() self.tables_dir = dict()
self.rec = None self.rec = None
@ -817,19 +892,22 @@ class join(ast_node):
def append(self, tbls, __alias = ''): def append(self, tbls, __alias = ''):
alias = lambda t : t + ' ' + __alias if len(__alias) else t alias = lambda t : t + ' ' + __alias if len(__alias) else t
# sub joins
if type(tbls) is join: if type(tbls) is join:
self.joins.append((alias(tbls.__str__()), tbls.have_sep)) self.joins.append((alias(tbls.__str__()), tbls.have_sep))
self.tables += tbls.tables self.tables += tbls.tables
self.tables_dir = {**self.tables_dir, **tbls.tables_dir} self.tables_dir = {**self.tables_dir, **tbls.tables_dir}
self.join_conditions += tbls.join_conditions self.join_conditions += tbls.join_conditions
# simple table
elif type(tbls) is TableInfo: elif type(tbls) is TableInfo:
self.joins.append((alias(tbls.table_name), False)) self.joins.append((alias(tbls.table_name), False))
self.tables.append(tbls) self.tables.append(tbls)
self.tables_dir[tbls.table_name] = tbls self.tables_dir[tbls.table_name] = tbls
for a in tbls.alias: for a in tbls.alias:
self.tables_dir[a] = tbls self.tables_dir[a] = tbls
# subquery
elif type(tbls) is projection: elif type(tbls) is projection:
self.joins.append((alias(tbls.finalize()), False)) self.joins.append((alias(tbls.finalize()), False))
@ -1764,9 +1842,11 @@ class cache(ast_node):
for t in tbl.columns: for t in tbl.columns:
schema_string += t.name + '\0' + \ schema_string += t.name + '\0' + \
encode_integral(aquery_types[t.type.ctype_name]) encode_integral(aquery_types[t.type.ctype_name])
# TODO: 1. deal with hsituation where the table has aliases
# TODO: 2. when table is already cached. use write-back/lazy/eager to deal with changes
from common.utils import send_to_server from common.utils import send_to_server
send_to_server(f'C{source}\0{"l" if lazy else "e"}\0{schema_string}\0') send_to_server(f'C{source}\0{"l" if lazy else "e"}\0{schema_string}\0')
tbl.cached = True
def include(objs): def include(objs):
import inspect import inspect

@ -162,7 +162,9 @@ struct PerfectHashTable {
// } // }
template <typename ... Types, template <typename> class VT> template <typename ... Types, template <typename> class VT>
void construct(VT<Types>&... args) { // std::enable_if_t<std::is_same_v<ValueType, bool>, void>
void
construct(VT<Types>&... args) { // construct a hash set
((this->n_cols = args.size), ...); ((this->n_cols = args.size), ...);
static_assert( static_assert(
(sizeof...(Types) < PerfectHashingThreshold) && (sizeof...(Types) < PerfectHashingThreshold) &&
@ -198,6 +200,6 @@ struct PerfectHashTable {
// problem: random memory access // problem: random memory access
} }
// delete[] hash_values; // delete[] hash_values;
free(hash_values); free(hash_values); // dispatch to gc
} }
}; };

@ -505,9 +505,16 @@ public:
ht_base = static_cast<uint32_t *>(calloc(sz, sizeof(uint32_t))); ht_base = static_cast<uint32_t *>(calloc(sz, sizeof(uint32_t)));
} }
template<typename... Keys_t>
inline void hashtable_push_all(Keys_t& ... keys, uint32_t len) {
for(uint32_t i = 0; i < len; ++i)
reversemap[i] = ankerl::unordered_dense::set<Key, Hash>::hashtable_push(keys[i]...);
for(uint32_t i = 0; i < len; ++i)
++ht_base[reversemap[i]];
}
inline void hashtable_push(Key&& k, uint32_t i){ inline void hashtable_push(Key&& k, uint32_t i){
reversemap[i] = ankerl::unordered_dense::set<Key, Hash>::hashtable_push(std::move(k)); reversemap[i] = ankerl::unordered_dense::set<Key, Hash>::hashtable_push(k);
++ht_base[reversemap[i]]; ++ht_base[reversemap[i]]; // do this seperately?
} }
auto ht_postproc(uint32_t sz) { auto ht_postproc(uint32_t sz) {

Loading…
Cancel
Save