From 540672cdc4d31cbca213e7d951c55a2f10836931 Mon Sep 17 00:00:00 2001 From: Bill Date: Mon, 16 Jan 2023 23:02:52 +0800 Subject: [PATCH] updated code generation for compound-columns, initial support for scratchspace --- engine/types.py | 22 +++-- reconstruct/ast.py | 12 +-- reconstruct/expr.py | 13 +++ server/aggregations.h | 216 +++++++++++++++++++++++++++++------------ server/gc.h | 51 +++++++++- server/libaquery.cpp | 68 +++++++++++++ server/table.h | 47 ++++++++- server/vector_type.hpp | 28 +++--- 8 files changed, 359 insertions(+), 98 deletions(-) diff --git a/engine/types.py b/engine/types.py index 31c5b37..46b750b 100644 --- a/engine/types.py +++ b/engine/types.py @@ -305,7 +305,7 @@ opor = OperatorBase('or', 2, logical, cname = '||', sqlname = ' OR ', call = bin opxor = OperatorBase('xor', 2, logical, cname = '^', sqlname = ' XOR ', call = binary_op_behavior) opgt = OperatorBase('gt', 2, logical, cname = '>', sqlname = '>', call = binary_op_behavior) oplt = OperatorBase('lt', 2, logical, cname = '<', sqlname = '<', call = binary_op_behavior) -opge = OperatorBase('gte', 2, logical, cname = '>=', sqlname = '>=', call = binary_op_behavior) +opgte = OperatorBase('gte', 2, logical, cname = '>=', sqlname = '>=', call = binary_op_behavior) oplte = OperatorBase('lte', 2, logical, cname = '<=', sqlname = '<=', call = binary_op_behavior) opneq = OperatorBase('neq', 2, logical, cname = '!=', sqlname = '!=', call = binary_op_behavior) opeq = OperatorBase('eq', 2, logical, cname = '==', sqlname = '=', call = binary_op_behavior) @@ -355,19 +355,27 @@ fnpow = OperatorBase('pow', 2, lambda *_ : DoubleT, cname = 'pow', sqlname = 'PO # type collections def _op_make_dict(*items : OperatorBase): return { i.name: i for i in items} +#binary op builtin_binary_arith = _op_make_dict(opadd, opdiv, opmul, opsub, opmod) builtin_binary_logical = _op_make_dict(opand, opor, opxor, opgt, oplt, - opge, oplte, opneq, opeq) + opgte, oplte, opneq, opeq) +builtin_binary_ops = {**builtin_binary_arith, **builtin_binary_logical} +#unary op builtin_unary_logical = _op_make_dict(opnot) builtin_unary_arith = _op_make_dict(opneg) builtin_unary_special = _op_make_dict(spnull, opdistinct) +# functions builtin_cstdlib = _op_make_dict(fnsqrt, fnlog, fnsin, fncos, fntan, fnpow) -builtin_func = _op_make_dict(fnmax, fnmin, fnsum, fnavg, fnmaxs, - fnmins, fndeltas, fnratios, fnlast, - fnfirst, fnsums, fnavgs, fncnt, - fnpack, fntrunc, fnprev, fnnext, - fnvar, fnvars, fnstd, fnstds) +builtin_aggfunc = _op_make_dict(fnmax, fnmin, fnsum, fnavg, + fnlast, fnfirst, fncnt, fnvar, fnstd) +builtin_vecfunc = _op_make_dict(fnmaxs, + fnmins, fndeltas, fnratios, fnsums, fnavgs, + fnpack, fntrunc, fnprev, fnnext, fnvars, fnstds) +builtin_vecfunc = {**builtin_vecfunc, **builtin_cstdlib} +builtin_func = {**builtin_vecfunc, **builtin_aggfunc} + user_module_func = {} + builtin_operators : Dict[str, OperatorBase] = {**builtin_binary_arith, **builtin_binary_logical, **builtin_unary_arith, **builtin_unary_logical, **builtin_unary_special, **builtin_func, **builtin_cstdlib, **user_module_func} diff --git a/reconstruct/ast.py b/reconstruct/ast.py index 37c5e52..31b3939 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -604,9 +604,10 @@ class groupby_c(ast_node): self.arr_len = 'arrlen_' + base62uuid(3) self.arr_values = 'arrvals_' + base62uuid(3) + self.context.emitc(f'auto {self.arr_len} = {self.group}.size();') + self.context.emitc(f'auto {self.arr_values} = {self.group}.values();') + if len(tovec_columns): - self.context.emitc(f'auto {self.arr_len} = {self.group}.size();') - self.context.emitc(f'auto {self.arr_values} = {self.group}.values();') preproc_scanner = scan(self, self.arr_len) preproc_scanner_it = preproc_scanner.it_var for c in tovec_columns: @@ -674,11 +675,8 @@ class groupby_c(ast_node): gscanner.add(f'{ex.eval(c_code = True, y=get_var_names, materialize_builtin = materialize_builtin)};\n') continue if col_tovec[i]: - if ex.opname == 'avgs': - patch_expr = get_var_names_ex(ex) - patch_expr = patch_expr[:patch_expr.rindex(')')] - patch_expr += ', ' + f'{ce[0]}[{gscanner.it_var}]' + ')' - gscanner.add(f'{patch_expr};\n') + if ex.remake_binary(f'{ce[0]}[{gscanner.it_var}]'): + gscanner.add(f'{get_var_names_ex(ex)};\n') else: gscanner.add(f'{ce[0]}[{gscanner.it_var}] = {get_var_names_ex(ex)};\n') else: diff --git a/reconstruct/expr.py b/reconstruct/expr.py index af1f0cb..135a21e 100644 --- a/reconstruct/expr.py +++ b/reconstruct/expr.py @@ -367,6 +367,19 @@ class expr(ast_node): self.curr_code += c.codegen(delegate) return self.curr_code + def remake_binary(self, ret_expr): + if self.root: + self.oldsql = self.sql + if (self.opname in builtin_binary_ops): + patched_opname = 'aqop_' + self.opname + self.sql = (f'{patched_opname}({self.children[0].sql}, ' + f'{self.children[1].sql}, {ret_expr})') + return True + elif self.opname in builtin_vecfunc: + self.sql = self.sql[:self.sql.rindex(')')] + self.sql += ', ' + ret_expr + ')' + return True + return False def __str__(self): return self.sql def __repr__(self): diff --git a/server/aggregations.h b/server/aggregations.h index ccd5c25..a51c011 100644 --- a/server/aggregations.h +++ b/server/aggregations.h @@ -1,5 +1,6 @@ #pragma once #include "types.h" +#include "gc.h" #include #include #include @@ -29,14 +30,19 @@ double avg(const VT& v) { return (sum(v) / static_cast(v.size)); } +template class VT, class Ret> +void sqrt(const VT& v, Ret& ret) { + for (uint32_t i = 0; i < v.size; ++i) + ret[i] = sqrt(v[i]); +} + template class VT> VT sqrt(const VT& v) { VT ret(v.size); - for (uint32_t i = 0; i < v.size; ++i) { - ret[i] = sqrt(v[i]); - } + sqrt(v, ret); return ret; } + template T truncate(const T& v, const uint32_t precision) { auto multiplier = pow(10, precision); @@ -73,109 +79,153 @@ T min(const VT& v) { min_v = min_v < _v ? min_v : _v; return min_v; } -template class VT> -decayed_t mins(const VT& arr) { + +// simplify this using a template std::binary_function = std::less; +template class VT, class Ret> +void mins(const VT& arr, Ret& ret) { const uint32_t& len = arr.size; - std::deque> cache; - decayed_t ret(len); T min = std::numeric_limits::max(); for (int i = 0; i < len; ++i) { if (arr[i] < min) min = arr[i]; ret[i] = min; } - return ret; } + template class VT> -decayed_t maxs(const VT& arr) { +decayed_t mins(const VT& arr) { + decayed_t ret(arr.size); + mins(arr, ret); + return ret; +} + +template class VT, class Ret> +void maxs(const VT& arr, Ret& ret) { const uint32_t& len = arr.size; - decayed_t ret(len); T max = std::numeric_limits::min(); for (int i = 0; i < len; ++i) { if (arr[i] > max) max = arr[i]; ret[i] = max; } - return ret; } template class VT> -decayed_t minw(uint32_t w, const VT& arr) { +decayed_t maxs(const VT& arr) { + decayed_t ret(arr.size); + maxs(arr, ret); + return ret; +} + +template class VT, class Ret> +void minw(uint32_t w, const VT& arr, Ret& ret) { const uint32_t& len = arr.size; - decayed_t ret(len); std::deque> cache; for (int i = 0; i < len; ++i) { if (!cache.empty() && cache.front().second == i - w) cache.pop_front(); + while (!cache.empty() && cache.back().first > arr[i]) cache.pop_back(); cache.push_back({ arr[i], i }); ret[i] = cache.front().first; } - return ret; } template class VT> -decayed_t maxw(uint32_t w, const VT& arr) { +decayed_t minw(uint32_t w, const VT& arr) { + decayed_t ret(arr.size); + minw(w, arr, ret); + return ret; +} + +template class VT, class Ret> +void maxw(uint32_t w, const VT& arr, Ret& ret) { const uint32_t& len = arr.size; - decayed_t ret(len); std::deque> cache; for (int i = 0; i < len; ++i) { if (!cache.empty() && cache.front().second == i - w) cache.pop_front(); - while (!cache.empty() && cache.back().first > arr[i]) cache.pop_back(); + while (!cache.empty() && cache.back().first < arr[i]) cache.pop_back(); cache.push_back({ arr[i], i }); - arr[i] = cache.front().first; + ret[i] = cache.front().first; } - return ret; } template class VT> -decayed_t> ratiow(uint32_t w, const VT& arr) { +inline decayed_t maxw(uint32_t w, const VT& arr) { + decayed_t ret(arr.size); + maxw(w, arr, ret); + return ret; +} + +template class VT, class Ret> +void ratiow(uint32_t w, const VT& arr, Ret& ret) { typedef std::decay_t> FPType; uint32_t len = arr.size; if (arr.size <= w) len = 1; w = w > len ? len : w; - decayed_t ret(arr.size); ret[0] = 0; for (uint32_t i = 0; i < w; ++i) ret[i] = arr[i] / (FPType)arr[0]; for (uint32_t i = w; i < arr.size; ++i) ret[i] = arr[i] / (FPType) arr[i - w]; +} + +template class VT> +inline decayed_t> ratiow(uint32_t w, const VT& arr) { + typedef std::decay_t> FPType; + decayed_t ret(arr.size); + ratiow(w, arr, ret); return ret; } template class VT> -decayed_t> ratios(const VT& arr) { +inline decayed_t> ratios(const VT& arr) { return ratiow(1, arr); } -template class VT> -decayed_t> sums(const VT& arr) { +template class VT, class Ret> +inline void ratios(const VT& arr, Ret& ret) { + return ratiow(1, arr, ret); +} + +template class VT, class Ret> +void sums(const VT& arr, Ret& ret) { const uint32_t& len = arr.size; - decayed_t> ret(len); uint32_t i = 0; if (len) ret[i++] = arr[0]; for (; i < len; ++i) ret[i] = ret[i - 1] + arr[i]; - return ret; } template class VT> -decayed_t>> avgs(const VT& arr) { +inline decayed_t> sums(const VT& arr) { + decayed_t> ret(arr.size); + sums(arr, ret); + return ret; +} + +template class VT, class Ret> +void avgs(const VT& arr, Ret& ret) { const uint32_t& len = arr.size; typedef types::GetFPType> FPType; - decayed_t ret(len); uint32_t i = 0; types::GetLongType s; if (len) s = ret[i++] = arr[0]; for (; i < len; ++i) ret[i] = (s += arr[i]) / (FPType)(i + 1); - return ret; } template class VT> -decayed_t> sumw(uint32_t w, const VT& arr) { +inline decayed_t>> avgs(const VT& arr) { + typedef types::GetFPType> FPType; + decayed_t ret(arr.size); + avgs(arr, ret); + return ret; +} + +template class VT, class Ret> +void sumw(uint32_t w, const VT& arr, Ret& ret) { const uint32_t& len = arr.size; - decayed_t> ret(len); uint32_t i = 0; w = w > len ? len : w; if (len) ret[i++] = arr[0]; @@ -183,11 +233,17 @@ decayed_t> sumw(uint32_t w, const VT& arr) { ret[i] = ret[i - 1] + arr[i]; for (; i < len; ++i) ret[i] = ret[i - 1] + arr[i] - arr[i - w]; - return ret; } template class VT> -void avgw(uint32_t w, const VT& arr, decayed_t>>& ret) { +decayed_t> sumw(uint32_t w, const VT& arr) { + decayed_t> ret(arr.size); + sumw(w, arr, ret); + return ret; +} + +template class VT, class Ret> +void avgw(uint32_t w, const VT& arr, Ret& ret) { typedef types::GetFPType> FPType; const uint32_t& len = arr.size; uint32_t i = 0; @@ -201,26 +257,19 @@ void avgw(uint32_t w, const VT& arr, decayed_t class VT> -decayed_t>> avgw(uint32_t w, const VT& arr) { +inline decayed_t>> avgw(uint32_t w, const VT& arr) { typedef types::GetFPType> FPType; const uint32_t& len = arr.size; decayed_t ret(len); - uint32_t i = 0; - types::GetLongType s{}; - w = w > len ? len : w; - if (len) s = ret[i++] = arr[0]; - for (; i < w; ++i) - ret[i] = (s += arr[i]) / (FPType)(i + 1); - for (; i < len; ++i) - ret[i] = ret[i - 1] + (arr[i] - arr[i - w]) / (FPType)w; + avgw(w, arr, ret); return ret; } -template class VT, bool sd = false> -decayed_t>> varw(uint32_t w, const VT& arr) { +template class VT, class Ret, bool sd = false> +void varw(uint32_t w, const VT& arr, + Ret& ret) { using FPType = types::GetFPType>; const uint32_t& len = arr.size; - decayed_t ret(len); uint32_t i = 0; types::GetLongType s{}; w = w > len ? len : w; @@ -252,7 +301,14 @@ decayed_t>> varw(uint32_t w, const VT if constexpr(sd) if(i) ret[i-1] = sqrt(ret[i-1]); - +} + + +template class VT, bool sd = false> +inline decayed_t>> varw(uint32_t w, const VT& arr) { + using FPType = types::GetFPType>; + decayed_t ret(arr.size); + varw>>, sd>(w, arr, ret); return ret; } @@ -274,11 +330,10 @@ types::GetFPType>> var(const VT& arr) { return (ssq - s * s / (FPType)(len + 1)) / (FPType)(len + 1); } -template class VT, bool sd = false> -decayed_t>> vars(const VT& arr) { +template class VT, class Ret, bool sd = false> +void vars(const VT& arr, Ret& ret) { typedef types::GetFPType> FPType; const uint32_t& len = arr.size; - decayed_t ret(len); uint32_t i = 0; types::GetLongType s{}; FPType MnX{}; @@ -298,51 +353,88 @@ decayed_t>> vars(const VT& arr) { ret[i] = MnX / (FPType)(i + 1); if constexpr(sd) ret[i] = sqrt(ret[i]); } +} + +template class VT, bool sd = false> +inline decayed_t>> vars(const VT& arr) { + typedef types::GetFPType> FPType; + decayed_t ret(arr.size); + vars>>, sd>(arr, ret); return ret; } + template class VT> -types::GetFPType>> stddev(const VT& arr) { +inline types::GetFPType>> stddev(const VT& arr) { return sqrt(var(arr)); } + template class VT> -decayed_t>> stddevs(const VT& arr) { +inline decayed_t>> stddevs(const VT& arr) { return vars(arr); } + template class VT> -decayed_t>> stddevw(uint32_t w, const VT& arr) { +inline decayed_t>> stddevw(uint32_t w, const VT& arr) { return varw(w, arr); } + +template class VT, class Ret> +inline auto stddevs(const VT& arr, Ret& ret) { + return vars(arr, ret); +} + +template class VT, class Ret> +inline auto stddevw(uint32_t w, const VT& arr, Ret& ret) { + return varw(w, arr, ret); +} + + // use getSignedType -template class VT> -decayed_t deltas(const VT& arr) { +template class VT, class Ret> +void deltas(const VT& arr, Ret& ret) { const uint32_t& len = arr.size; - decayed_t ret(len); uint32_t i = 0; if (len) ret[i++] = 0; for (; i < len; ++i) ret[i] = arr[i] - arr[i - 1]; - return ret; } template class VT> -decayed_t prev(const VT& arr) { +inline decayed_t deltas(const VT& arr) { + decayed_t ret(arr.size); + deltas(arr, ret); + return ret; +} + +template class VT, class Ret> +void prev(const VT& arr, Ret& ret) { const uint32_t& len = arr.size; - decayed_t ret(len); uint32_t i = 0; if (len) ret[i++] = arr[0]; for (; i < len; ++i) ret[i] = arr[i - 1]; - return ret; } template class VT> -decayed_t aggnext(const VT& arr) { +inline decayed_t prev(const VT& arr) { + decayed_t ret(arr.size); + prev(arr, ret); + return ret; +} + +template class VT, class Ret> +void aggnext(const VT& arr, Ret& ret) { const uint32_t& len = arr.size; - decayed_t ret(len); uint32_t i = 1; for (; i < len; ++i) ret[i - 1] = arr[i]; if (len > 0) ret[len - 1] = arr[len - 1]; +} + +template class VT> +inline decayed_t aggnext(const VT& arr) { + decayed_t ret(arr.size); + aggnext(arr, ret); return ret; } @@ -360,8 +452,6 @@ T first(const VT& arr) { return arr[0]; } - - #define __DEFAULT_AGGREGATE_FUNCTION__(NAME, RET) \ template constexpr T NAME(const T& v) { return RET; } diff --git a/server/gc.h b/server/gc.h index 1099248..45c827e 100644 --- a/server/gc.h +++ b/server/gc.h @@ -1,4 +1,39 @@ #pragma once + +class ScratchSpace { +public: + void* ret; + char* scratchspace; + size_t ptr; + size_t cnt; + size_t capacity; + size_t initial_capacity; + void* temp_memory_fractions; + + //uint8_t status; + // record maximum size + constexpr static uint8_t Grow = 0x1; + // no worry about overflow + constexpr static uint8_t Use = 0x0; + + void init(size_t initial_capacity); + + // apply for memory + void* alloc(uint32_t sz); + + void register_ret(void* ret); + + // reorganize memory space + void release(); + + // reset status of the scratch space + void reset(); + + // reset scratch space to initial capacity. + void cleanup(); +}; + + #ifndef __AQ_USE_THREADEDGC__ #include class GC { @@ -18,7 +53,7 @@ private:; std::atomic current_size; volatile bool lock; using gc_deallocator_t = void (*)(void*); - + ScratchSpace scratch; // maybe use volatile std::thread::id instead protected: void acquire_lock(); @@ -29,28 +64,36 @@ protected: void terminate_daemon(); public: - void reg(void* v, uint32_t sz = 1, + void reg(void* v, uint32_t sz = 0xffffffff, void(*f)(void*) = free ); + uint32_t get_threshold() const { + return threshould; + } + GC( uint64_t max_size = 0xfffffff, uint32_t max_slots = 4096, uint32_t interval = 10000, uint32_t forced_clean = 1000000, - uint32_t threshould = 64 //one seconds + uint32_t threshould = 64, //one seconds + uint32_t scratch_sz = 0x1000000 // 16 MB ) : max_size(max_size), max_slots(max_slots), interval(interval), forced_clean(forced_clean), threshould(threshould) { start_deamon(); GC::gc_handle = this; + this->scratch.init(scratch_sz); } // 256 MB ~GC(){ terminate_daemon(); + scratch.cleanup(); } + static GC* gc_handle; template - constexpr static inline gc_deallocator_t _delete(T*){ + static inline gc_deallocator_t _delete(T*) { return [](void* v){ delete (T*)v; }; diff --git a/server/libaquery.cpp b/server/libaquery.cpp index a97d6e8..fc8f5dc 100644 --- a/server/libaquery.cpp +++ b/server/libaquery.cpp @@ -452,6 +452,9 @@ void GC::reg(void* v, uint32_t sz, void(*f)(void*)) { //~ 40ns expected v. free f(v); return; } + else if (sz == 0xffffffff) + sz = this->threshould; + auto _q = static_cast(q); while(lock); ++alive_cnt; @@ -466,6 +469,71 @@ void GC::reg(void* v, uint32_t sz, void(*f)(void*)) { //~ 40ns expected v. free inline GC* GC::gc_handle = nullptr; +void ScratchSpace::init(size_t initial_capacity) { + ret = nullptr; + scratchspace = static_cast(malloc(initial_capacity)); + ptr = cnt = 0; + capacity = initial_capacity; + this->initial_capacity = initial_capacity; + temp_memory_fractions = new vector_type(); +} + +inline void* ScratchSpace::alloc(uint32_t sz){ + this->cnt += sz; // major cost + ptr = this->cnt; + if (this->cnt > capacity) { + [[unlikely]] + capacity = this->cnt + (capacity >> 1); + auto vec_tmpmem_fractions = static_cast*>(temp_memory_fractions); + vec_tmpmem_fractions->emplace_back(scratchspace); + scratchspace = static_cast(malloc(capacity)); + ptr = 0; + } + return scratchspace + ptr; +} + +inline void ScratchSpace::register_ret(void* ret){ + this->ret = ret; +} + +inline void ScratchSpace::release(){ + ptr = cnt = 0; + ret = nullptr; + auto vec_tmpmem_fractions = + static_cast*>(temp_memory_fractions); + if (vec_tmpmem_fractions->size) { + //[[unlikely]] + for(auto& mem : *vec_tmpmem_fractions){ + free(mem); + //GC::gc_handle->reg(mem); + } + vec_tmpmem_fractions->clear(); + } +} + +inline void ScratchSpace::reset() { + this->release(); + if (capacity != initial_capacity){ + capacity = initial_capacity; + scratchspace = static_cast(realloc(scratchspace, capacity)); + } +} + +void ScratchSpace::cleanup(){ + auto vec_tmpmem_fractions = + static_cast*>(temp_memory_fractions); + if (vec_tmpmem_fractions->size) { + for(auto& mem : *vec_tmpmem_fractions){ + free(mem); + //GC::gc_handle->reg(mem); + } + vec_tmpmem_fractions->clear(); + } + delete vec_tmpmem_fractions; + free(this->scratchspace); +} + + #include "dragonbox/dragonbox_to_chars.hpp" diff --git a/server/table.h b/server/table.h index c2b3400..36f7230 100644 --- a/server/table.h +++ b/server/table.h @@ -36,7 +36,8 @@ struct ColRef_cstorage { int ty; // what if enum is not int? }; -template