Compare commits

...

4 Commits

@ -0,0 +1,6 @@
CREATE TABLE trade01m(stocksymbol STRING, time INT, quantity INT, price INT)
load data infile "../tables/trade01m.csv" into table trade01m fields terminated by ','
CREATE TABLE trade1m(stocksymbol STRING, time INT, quantity INT, price INT)
load data infile "../tables/trade1m.csv" into table trade1m fields terminated by ','
CREATE TABLE trade10m(stocksymbol STRING, time INT, quantity INT, price INT)
load data infile "../tables/trade10m.csv" into table trade10m fields terminated by ','

@ -0,0 +1,5 @@
-- select rows
<sql>
CREATE TABLE res0 AS
SELECT * FROM trade10m
</sql>

@ -0,0 +1,7 @@
-- groupby_multi_different_functions
<sql>
CREATE TABLE res1 AS
SELECT avg(quantity) AS avg_quan, min(price) AS min_p
FROM trade1m
GROUP BY stocksymbol, time
</sql>

@ -0,0 +1,4 @@
SELECT stocksymbol, MAX(stddevs(3, price))
FROM trade1m
ASSUMING ASC time
GROUP BY stocksymbol

@ -0,0 +1,4 @@
-- count values
<sql>
SELECT COUNT(*) FROM trade10m
</sql>

@ -0,0 +1,7 @@
-- group by multiple keys
<sql>
create table res3 AS
SELECT sum(quantity) as sum_quantity
FROM trade01m
GROUP BY stocksymbol, price
</sql>

@ -0,0 +1,5 @@
-- append tables
<sql>
CREATE TABLE res4 AS
SELECT * FROM trade10m UNION ALL SELECT * FROM trade10m
</sql>

@ -0,0 +1,5 @@
CREATE table res7 AS
SELECT stocksymbol, avgs(5, price)
FROM trade10m
ASSUMING ASC time
GROUP BY stocksymbol

@ -0,0 +1,6 @@
<sql>
CREATE TABLE res8 AS
SELECT stocksymbol, quantity, price
FROM trade10m
WHERE time >= 5288 and time <= 7000
</sql>

@ -0,0 +1,6 @@
<sql>
CREATE TABLE res9 AS
SELECT stocksymbol, MAX(price) - MIN(price)
FROM trade10m
GROUP BY stocksymbol
</sql>

@ -0,0 +1,3 @@
-- q0 select rows
CREATE TABLE res0 (a String, b Int32, c Int32, d Int32) ENGINE = MergeTree() ORDER BY b AS
SELECT * FROM benchmark.trade10m

@ -0,0 +1,4 @@
-- groupby_multi_different_functions
SELECT avg(quantity), min(price)
FROM benchmark.trade10m
GROUP BY stocksymbol, time

@ -0,0 +1,8 @@
-- max rolling std
select
stocksymbol,
max(stddevPop(price)) over
(partition by stocksymbol rows between 2 preceding AND CURRENT row) as maxRollingStd
from
(SELECT * FROM benchmark.trade01m ORDER BY time)
GROUP BY stocksymbol

@ -0,0 +1,2 @@
-- count values
SELECT COUNT(*) FROM benchmark.trade10m

@ -0,0 +1,4 @@
-- group by multiple keys
SELECT sum(quantity)
FROM benchmark.trade10m
GROUP BY stocksymbol, price

@ -0,0 +1,2 @@
-- append two tables
SELECT * FROM benchmark.trade10m UNION ALL SELECT * FROM benchmark.trade10m

@ -0,0 +1,5 @@
-- moving_avg
SELECT stocksymbol, groupArrayMovingAvg(5)(price) AS moving_avg_price
FROM
(SELECT * FROM benchmark.trade01m ORDER BY time)
GROUP BY stocksymbol

@ -0,0 +1,3 @@
SELECT stocksymbol, quantity, price
FROM benchmark.trade10m
WHERE time >= 5288 and time <= 7000

@ -0,0 +1,3 @@
SELECT stocksymbol, MAX(price) - MIN(price)
FROM benchmark.trade1m
GROUP BY stocksymbol

@ -0,0 +1,3 @@
-- select rows
CREATE TABLE res0 AS
SELECT * FROM trade10m;

@ -0,0 +1,4 @@
-- groupby_multi_different_functions
SELECT avg(quantity), min(price)
FROM trade10m
GROUP BY stocksymbol, time;

@ -0,0 +1,7 @@
select
stocksymbol,
max(stddev(price)) over
(partition by stocksymbol rows between 2 preceding AND CURRENT row) as maxRollingStd
from
(SELECT * FROM trade01m ORDER BY time) as t
GROUP BY stocksymbol;

@ -0,0 +1,2 @@
-- count values
SELECT COUNT(*) FROM trade10m;

@ -0,0 +1,4 @@
-- group by multiple keys
SELECT sum(quantity)
FROM trade10m
GROUP BY stocksymbol, price;

@ -0,0 +1,2 @@
-- append tables
SELECT * FROM trade10m UNION ALL SELECT * FROM trade10m;

@ -0,0 +1,5 @@
select
stocksymbol,
coalesce(avg(price) over
(partition by stocksymbol order by time rows between 4 preceding AND CURRENT row), price) as rollingAvg
from trade10m;

@ -0,0 +1,3 @@
SELECT stocksymbol, quantity, price
FROM trade01m
WHERE time >= 5288 and time <= 7000

@ -0,0 +1,3 @@
SELECT stocksymbol, MAX(price) - MIN(price)
FROM trade01m
GROUP BY stocksymbol;

@ -107,9 +107,9 @@ ULongT = Types(8, name = 'uint64', sqlname = 'UINT64', fp_type=DoubleT)
UIntT = Types(7, name = 'uint32', sqlname = 'UINT32', long_type=ULongT, fp_type=FloatT)
UShortT = Types(6, name = 'uint16', sqlname = 'UINT16', long_type=ULongT, fp_type=FloatT)
UByteT = Types(5, name = 'uint8', sqlname = 'UINT8', long_type=ULongT, fp_type=FloatT)
StrT = Types(200, name = 'str', cname = 'const char*', sqlname='TEXT', ctype_name = 'types::ASTR')
TextT = Types(200, name = 'text', cname = 'const char*', sqlname='TEXT', ctype_name = 'types::ASTR')
VarcharT = Types(200, name = 'varchar', cname = 'const char*', sqlname='VARCHAR', ctype_name = 'types::ASTR')
StrT = Types(200, name = 'str', cname = 'string_view', sqlname='TEXT', ctype_name = 'types::ASTR')
TextT = Types(200, name = 'text', cname = 'string_view', sqlname='TEXT', ctype_name = 'types::ASTR')
VarcharT = Types(200, name = 'varchar', cname = 'string_view', sqlname='VARCHAR', ctype_name = 'types::ASTR')
VoidT = Types(200, name = 'void', cname = 'void', sqlname='Null', ctype_name = 'types::None')
class VectorT(Types):

@ -2,7 +2,7 @@ import struct
import readline
from typing import List
name : str = input()
name : str = input('Filename (in path ./procedures/<filename>.aqp):')
def write():
s : str = input()

@ -339,8 +339,8 @@ class projection(ast_node):
return ', '.join([self.pyname2cname[n.name] for n in lst_names])
else:
return self.pyname2cname[proj_name]
for key, val in proj_map.items():
gb_tovec = [False] * len(proj_map)
for i, (key, val) in enumerate(proj_map.items()):
if type(val[1]) is str:
x = True
y = get_proj_name
@ -357,22 +357,27 @@ class projection(ast_node):
out_typenames[key] = decltypestring
else:
out_typenames[key] = val[0].cname
if (type(val[2].udf_called) is udf and # should bulkret also be colref?
elemental_ret_udf = (
type(val[2].udf_called) is udf and # should bulkret also be colref?
val[2].udf_called.return_pattern == udf.ReturnPattern.elemental_return
or
self.group_node and
(self.group_node.use_sp_gb and
)
folding_vector_groups = (
self.group_node and
(
self.group_node.use_sp_gb and
val[2].cols_mentioned.intersection(
self.datasource.all_cols().difference(
self.datasource.get_joint_cols(self.group_node.refs)
))
) and val[2].is_compound # compound val not in key
# or
# val[2].is_compound > 1
# (not self.group_node and val[2].is_compound)
):
out_typenames[key] = f'vector_type<{out_typenames[key]}>'
self.out_table.columns[key].compound = True
)
)
) and
val[2].is_compound # compound val not in key
)
if (elemental_ret_udf or folding_vector_groups):
out_typenames[key] = f'vector_type<{out_typenames[key]}>'
self.out_table.columns[key].compound = True
if self.group_node is not None and self.group_node.use_sp_gb:
gb_tovec[i] = True
outtable_col_nameslist = ', '.join([f'"{c.name}"' for c in self.out_table.columns])
self.outtable_col_names = 'names_' + base62uuid(4)
self.context.emitc(f'const char* {self.outtable_col_names}[] = {{{outtable_col_nameslist}}};')
@ -384,12 +389,14 @@ class projection(ast_node):
gb_vartable : Dict[str, Union[str, int]] = deepcopy(self.pyname2cname)
gb_cexprs : List[str] = []
gb_colnames : List[str] = []
gb_types : List[Types] = []
for key, val in proj_map.items():
col_name = 'col_' + base62uuid(6)
self.context.emitc(f'decltype(auto) {col_name} = {self.out_table.contextname_cpp}->get_col<{key}>();')
gb_cexprs.append((col_name, val[2]))
gb_colnames.append(col_name)
self.group_node.finalize(gb_cexprs, gb_vartable, gb_colnames)
gb_types.append(val[0])
self.group_node.finalize(gb_cexprs, gb_vartable, gb_colnames, gb_types, gb_tovec)
else:
for i, (key, val) in enumerate(proj_map.items()):
if type(val[1]) is int:
@ -533,6 +540,7 @@ class groupby_c(ast_node):
def init(self, node : List[Tuple[expr, Set[ColRef]]]):
self.proj : projection = self.parent
self.glist : List[Tuple[expr, Set[ColRef]]] = node
self.vecs : str = 'vecs_' + base62uuid(3)
return super().init(node)
def produce(self, node : List[Tuple[expr, Set[ColRef]]]):
@ -561,21 +569,22 @@ class groupby_c(ast_node):
e = g_str
g_contents_list.append(e)
first_col = g_contents_list[0]
self.total_sz = 'len_' + base62uuid(4)
self.context.emitc(f'uint32_t {self.total_sz} = {first_col}.size;')
g_contents_decltype = [f'decays<decltype({c})::value_t>' for c in g_contents_list]
g_contents = ', '.join(
[f'{c}[{scanner_itname}]' for c in g_contents_list]
)
self.context.emitc(f'typedef record<{",".join(g_contents_decltype)}> {self.group_type};')
self.context.emitc(f'ankerl::unordered_dense::map<{self.group_type}, vector_type<uint32_t>, '
f'transTypes<{self.group_type}, hasher>> {self.group};')
self.context.emitc(f'{self.group}.reserve({first_col}.size);')
self.context.emitc(f'AQHashTable<{self.group_type}, '
f'transTypes<{self.group_type}, hasher>> {self.group} {{{self.total_sz}}};')
self.n_grps = len(self.glist)
self.scanner = scan(self, first_col + '.size', it_name=scanner_itname)
self.scanner.add(f'{self.group}[forward_as_tuple({g_contents})].emplace_back({self.scanner.it_var});')
self.scanner = scan(self, self.total_sz, it_name=scanner_itname)
self.scanner.add(f'{self.group}.hashtable_push(forward_as_tuple({g_contents}), {self.scanner.it_var});')
def consume(self, _):
self.scanner.finalize()
self.context.emitc(f'auto {self.vecs} = {self.group}.ht_postproc({self.total_sz});')
# def deal_with_assumptions(self, assumption:assumption, out:TableInfo):
# gscanner = scan(self, self.group)
# val_var = 'val_'+base62uuid(7)
@ -583,16 +592,40 @@ class groupby_c(ast_node):
# gscanner.add(f'{self.datasource.cxt_name}->order_by<{assumption.result()}>(&{val_var});')
# gscanner.finalize()
def finalize(self, cexprs : List[Tuple[str, expr]], var_table : Dict[str, Union[str, int]], col_names : List[str]):
for c in col_names:
def finalize(self, cexprs : List[Tuple[str, expr]], var_table : Dict[str, Union[str, int]],
col_names : List[str], col_types : List[Types], col_tovec : List[bool]):
tovec_columns = set()
for i, c in enumerate(col_names):
self.context.emitc(f'{c}.reserve({self.group}.size());')
gscanner = scan(self, self.group, loop_style = 'for_each')
if col_tovec[i]: # and type(col_types[i]) is VectorT:
typename : Types = col_types[i] # .inner_type
self.context.emitc(f'auto buf_{c} = static_cast<{typename.cname} *>(malloc({self.total_sz} * sizeof({typename.cname})));')
tovec_columns.add(c)
self.arr_len = 'arrlen_' + base62uuid(3)
self.arr_values = 'arrvals_' + base62uuid(3)
if len(tovec_columns):
self.context.emitc(f'auto {self.arr_len} = {self.group}.size();')
self.context.emitc(f'auto {self.arr_values} = {self.group}.values();')
preproc_scanner = scan(self, self.arr_len)
preproc_scanner_it = preproc_scanner.it_var
for c in tovec_columns:
preproc_scanner.add(f'{c}[{preproc_scanner_it}].init_from'
f'({self.vecs}[{preproc_scanner_it}].size,'
f' {"buf_" + c} + {self.group}.ht_base'
f'[{preproc_scanner_it}]);'
)
preproc_scanner.finalize()
# gscanner = scan(self, self.group, loop_style = 'for_each')
gscanner = scan(self, self.arr_len)
key_var = 'key_'+base62uuid(7)
val_var = 'val_'+base62uuid(7)
gscanner.add(f'auto &{key_var} = {gscanner.it_var}.first;', position = 'front')
gscanner.add(f'auto &{val_var} = {gscanner.it_var}.second;', position = 'front')
# gscanner.add(f'auto &{key_var} = {gscanner.it_var}.first;', position = 'front')
# gscanner.add(f'auto &{val_var} = {gscanner.it_var}.second;', position = 'front')
gscanner.add(f'auto &{key_var} = {self.arr_values}[{gscanner.it_var}];', position = 'front')
gscanner.add(f'auto &{val_var} = {self.vecs}[{gscanner.it_var}];', position = 'front')
len_var = None
def define_len_var():
nonlocal len_var
@ -627,7 +660,7 @@ class groupby_c(ast_node):
materialize_builtin = materialize_builtin,
count=lambda:f'{val_var}.size')
for ce in cexprs:
for i, ce in enumerate(cexprs):
ex = ce[1]
materialize_builtin = {}
if type(ex.udf_called) is udf:
@ -640,7 +673,16 @@ class groupby_c(ast_node):
materialize_builtin['_builtin_ret'] = f'{ce[0]}.back()'
gscanner.add(f'{ex.eval(c_code = True, y=get_var_names, materialize_builtin = materialize_builtin)};\n')
continue
gscanner.add(f'{ce[0]}.emplace_back({get_var_names_ex(ex)});\n')
if col_tovec[i]:
if ex.opname == 'avgs':
patch_expr = get_var_names_ex(ex)
patch_expr = patch_expr[:patch_expr.rindex(')')]
patch_expr += ', ' + f'{ce[0]}[{gscanner.it_var}]' + ')'
gscanner.add(f'{patch_expr};\n')
else:
gscanner.add(f'{ce[0]}[{gscanner.it_var}] = {get_var_names_ex(ex)};\n')
else:
gscanner.add(f'{ce[0]}.emplace_back({get_var_names_ex(ex)});\n')
gscanner.finalize()
@ -718,10 +760,11 @@ class groupby(ast_node):
# self.parent.var_table.
self.parent.col_ext.update(l[1])
def finalize(self, cexprs : List[Tuple[str, expr]], var_table : Dict[str, Union[str, int]], col_names : List[str]):
def finalize(self, cexprs : List[Tuple[str, expr]], var_table : Dict[str, Union[str, int]],
col_names : List[str], col_types : List[Types], col_tovec : List[bool]):
if self.use_sp_gb:
self.dedicated_gb = groupby_c(self.parent, self.dedicated_glist)
self.dedicated_gb.finalize(cexprs, var_table, col_names)
self.dedicated_gb.finalize(cexprs, var_table, col_names, col_types, col_tovec)
class join(ast_node):

@ -132,7 +132,3 @@ namespace ankerl::unordered_dense{
struct hash<std::tuple<Types...>> : public hasher<Types...>{ };
}
struct aq_hashtable_value_t {
uint32_t id;
uint32_t cnt;
};

@ -295,6 +295,7 @@ void initialize_module(const char* module_name, void* module_handle, Context* cx
printf("Warning: module %s have no session support.\n", module_name);
}
}
#pragma endregion
int dll_main(int argc, char** argv, Context* cxt){
aq_timer timer;

@ -1113,8 +1113,8 @@ public:
template <class K,
typename Q = T,
typename H = Hash,
typename KE = KeyEqual,
std::enable_if_t<!is_map_v<Q> && is_transparent_v<H, KE>, bool> = true>
typename KE = KeyEqual>//,
//std::enable_if_t<!is_map_v<Q> && is_transparent_v<H, KE>, bool> = true>
auto hashtable_push(K&& key) -> unsigned {
if (is_full()) {
increase_size();
@ -1141,35 +1141,35 @@ template <class K,
place_and_shift_up({dist_and_fingerprint, value_idx}, bucket_idx);
return static_cast<uint32_t>(value_idx);
}
template <class... Args>
auto hashtable_push(Args&&... args) -> unsigned {
if (is_full()) {
increase_size();
}
// we have to instantiate the value_type to be able to access the key.
// 1. emplace_back the object so it is constructed. 2. If the key is already there, pop it later in the loop.
auto& key = get_key(m_values.emplace_back(std::forward<Args>(args)...));
auto hash = mixed_hash(key);
auto dist_and_fingerprint = dist_and_fingerprint_from_hash(hash);
auto bucket_idx = bucket_idx_from_hash(hash);
while (dist_and_fingerprint <= at(m_buckets, bucket_idx).m_dist_and_fingerprint) {
if (dist_and_fingerprint == at(m_buckets, bucket_idx).m_dist_and_fingerprint &&
m_equal(key, get_key(m_values[at(m_buckets, bucket_idx).m_value_idx]))) {
m_values.pop_back(); // value was already there, so get rid of it
return static_cast<uint32_t>(at(m_buckets, bucket_idx).m_value_idx);
}
dist_and_fingerprint = dist_inc(dist_and_fingerprint);
bucket_idx = next(bucket_idx);
}
// value is new, place the bucket and shift up until we find an empty spot
auto value_idx = static_cast<value_idx_type>(m_values.size() - 1);
place_and_shift_up({dist_and_fingerprint, value_idx}, bucket_idx);
return static_cast<uint32_t>(value_idx);
}
// template <class... Args>
// auto hashtable_push(Args&&... args) -> unsigned {
// if (is_full()) {
// increase_size();
// }
// // we have to instantiate the value_type to be able to access the key.
// // 1. emplace_back the object so it is constructed. 2. If the key is already there, pop it later in the loop.
// auto& key = get_key(m_values.emplace_back(std::forward<Args>(args)...));
// auto hash = mixed_hash(key);
// auto dist_and_fingerprint = dist_and_fingerprint_from_hash(hash);
// auto bucket_idx = bucket_idx_from_hash(hash);
// while (dist_and_fingerprint <= at(m_buckets, bucket_idx).m_dist_and_fingerprint) {
// if (dist_and_fingerprint == at(m_buckets, bucket_idx).m_dist_and_fingerprint &&
// m_equal(key, get_key(m_values[at(m_buckets, bucket_idx).m_value_idx]))) {
// m_values.pop_back(); // value was already there, so get rid of it
// return static_cast<uint32_t>(at(m_buckets, bucket_idx).m_value_idx);
// }
// dist_and_fingerprint = dist_inc(dist_and_fingerprint);
// bucket_idx = next(bucket_idx);
// }
// // value is new, place the bucket and shift up until we find an empty spot
// auto value_idx = static_cast<value_idx_type>(m_values.size() - 1);
// place_and_shift_up({dist_and_fingerprint, value_idx}, bucket_idx);
// return static_cast<uint32_t>(value_idx);
// }
template <class... Args>
auto emplace(Args&&... args) -> std::pair<iterator, bool> {
if (is_full()) {

@ -427,6 +427,19 @@ constexpr vector_type<std::string_view>::vector_type(const uint32_t size, void*
// }
// }
// template<template <typename> class VT>
// inline void
// prealloc_vector (VT &vt, uint32_t sz) {
// vt.reserve(sz);
// }
// template<class T>
// inline void
// prealloc_vector (vector_type<vector_type<T>> &vt,
// uint32_t outer_sz, uint32_t inner_sz) {
// vt.reserve(outer_sz);
// auto mem = static_cast<T*>(malloc(inner_sz * sizeof(T)));
// }
template <>
class vector_type<void> {
@ -460,4 +473,48 @@ public:
vector_type<void> subvec_deep(uint32_t);
};
#pragma pack(pop)
template <class Key, class Hash>
class AQHashTable : public ankerl::unordered_dense::set<Key, Hash> {
public:
uint32_t* reversemap, *mapbase, *ht_base;
AQHashTable() = default;
explicit AQHashTable(uint32_t sz)
: ankerl::unordered_dense::set<Key, Hash>{} {
this->reserve(sz);
reversemap = static_cast<uint32_t *>(malloc(sizeof(uint32_t) * sz * 2));
mapbase = reversemap + sz;
ht_base = static_cast<uint32_t *>(calloc(sz, sizeof(uint32_t)));
}
void init(uint32_t sz) {
ankerl::unordered_dense::set<Key, Hash>::reserve(sz);
reversemap = static_cast<uint32_t *>(malloc(sizeof(uint32_t) * sz * 2));
mapbase = reversemap + sz;
ht_base = static_cast<uint32_t *>(calloc(sz, sizeof(uint32_t)));
}
inline void hashtable_push(Key&& k, uint32_t i){
reversemap[i] = ankerl::unordered_dense::set<Key, Hash>::hashtable_push(std::forward<Key&&>(k));
++ht_base[reversemap[i]];
}
auto ht_postproc(uint32_t sz) {
auto& arr_values = this->values();
const auto& len = this->size();
auto vecs = static_cast<vector_type<uint32_t>*>(malloc(sizeof(vector_type<uint32_t>) * len));
vecs[0].init_from(ht_base[0], mapbase);
for (uint32_t i = 1; i < len; ++i) {
vecs[i].init_from(ht_base[i], mapbase + ht_base[i - 1]);
ht_base[i] += ht_base[i - 1];
}
for (uint32_t i = 0; i < sz; ++i) {
auto id = reversemap[i];
mapbase[--ht_base[id]] = i;
}
return vecs;
}
};
#endif

Loading…
Cancel
Save