improved caching and hashing

master
bill 1 year ago
parent 0815222e96
commit 52afa95e94

@ -1 +1 @@
Subproject commit ed2b7b350f162271472feb40dd02f5c885f51683
Subproject commit 6d4c91d015dfc6ffef48cf5a1f1e92fb192cc234

@ -135,7 +135,7 @@ class projection(ast_node):
self.has_preproc = (
self.has_preproc and
not self.datasource.no_join and
not self.datasource.tables[0].cached
self.datasource.tables[0].cached
)
self.datasource_changed = True
self.prev_datasource = self.context.datasource
@ -351,7 +351,10 @@ class projection(ast_node):
col_types = [c.type.cname for c in obj_input_table.columns]
self.input_table_type = f'TableInfo<{", ".join(col_types)}>'
self.context.emitc(f'{self.input_table_type}* {self.input_table_name}'
f' = cxt->tables["{obj_input_table.table_name}"];')
f' = static_cast<{self.input_table_type}*>('
f'cxt->tables["{obj_input_table.table_name}"]);')
# if not self.has_preproc:
# col_idxs = {col.name : i for i, col in enumerate(self.datasource.all_cols())}
for v, idx in self.var_table.items():
vname = get_legal_name(v) + '_' + base62uuid(3)
@ -359,7 +362,7 @@ class projection(ast_node):
if not self.has_preproc:
# TODO: verify/ensure that idx is the same as cid in the table.
self.context.emitc(f'decltype(auto) {vname} = '
f'{self.input_table_name}->get_col<{idx}>();')
f'{self.input_table_name}->get_col<{self.datasource.parse_col_names(v).id}>();')
else:
self.context.emitc(f'auto {vname} = ColRef<{typenames[idx].cname}>('
f'{length_name}, '
@ -462,8 +465,9 @@ class projection(ast_node):
def finalize(self, node):
self.deal_with_into(node)
self.context.emitc(f'puts("done.");')
self.context.emitc('printf("done: %lld\\n", (chrono::high_resolution_clock::now() - timer).count());timer = chrono::high_resolution_clock::now();')
## self.context.emitc('printf("done: %lld\\n", (chrono::high_resolution_clock::now() - timer).count());timer = chrono::high_resolution_clock::now();')
if not self.has_preproc:
self.context.sql = ''
if self.parent is None:
self.context.sql_end()
if self.has_postproc:
@ -566,7 +570,7 @@ class scan(ast_node):
self.parent.context.scans.append(self)
def produce(self, node):
self.start += '#pragma openmp simd\n'
self.start += self.context.omp_simd
if self.loop_style == scan.LoopStyle.foreach:
self.colref = node
self.start += f'for ({self.const}auto& {self.it_var} : {node}) {{\n'
@ -596,7 +600,7 @@ class scan(ast_node):
self.start +
self.front +
b +
'\n}'
'\n}\n'
) for b in self.body])
+
self.end
@ -607,7 +611,7 @@ class scan(ast_node):
self.start +
self.front +
'\n'.join(self.body) +
'\n}' +
'\n}\n' +
self.end
)
self.context.remove_scan(self, scan_assembly)
@ -649,11 +653,12 @@ class groupby_c(ast_node):
first_col = g_contents_list[0]
self.total_sz = 'len_' + base62uuid(4)
self.context.emitc(f'uint32_t {self.total_sz} = {first_col}.size;')
g_contents_decltype = [f'decays<decltype({c})::value_t>' for c in g_contents_list]
g_contents_decltype = [f'decays<decltype({c})>::value_t' for c in g_contents_list]
g_contents = ', '.join(
[f'{c}[{scanner_itname}]' for c in g_contents_list]
# [f'{c}[{scanner_itname}]' for c in g_contents_list]
g_contents_list
)
self.context.emitc('printf("init_time: %lld\\n", (chrono::high_resolution_clock::now() - timer).count()); timer = chrono::high_resolution_clock::now();')
## self.context.emitc('printf("init_time: %lld\\n", (chrono::high_resolution_clock::now() - timer).count()); timer = chrono::high_resolution_clock::now();')
self.context.emitc(f'typedef record<{",".join(g_contents_decltype)}> {self.group_type};')
self.context.emitc(f'AQHashTable<{self.group_type}, '
f'transTypes<{self.group_type}, hasher>> {self.group} {{{self.total_sz}}};')
@ -661,13 +666,13 @@ class groupby_c(ast_node):
# self.scanner = scan(self, self.total_sz, it_name=scanner_itname)
# self.scanner.add(f'{self.group}.hashtable_push(forward_as_tuple({g_contents}), {self.scanner.it_var});')
self.context.emitc(f'{self.group}.hashtable_push_all({g_contents}, {self.total_sz});')
self.context.emitc(f'{self.group}.hashtable_push_all<{", ".join([f"decays<decltype({c})>" for c in g_contents_list])}>({g_contents}, {self.total_sz});')
def consume(self, _):
# self.scanner.finalize()
self.context.emitc('printf("ht_construct: %lld\\n", (chrono::high_resolution_clock::now() - timer).count()); timer = chrono::high_resolution_clock::now();')
## self.context.emitc('printf("ht_construct: %lld\\n", (chrono::high_resolution_clock::now() - timer).count()); timer = chrono::high_resolution_clock::now();')
self.context.emitc(f'auto {self.vecs} = {self.group}.ht_postproc({self.total_sz});')
self.context.emitc('printf("ht_postproc: %lld\\n", (chrono::high_resolution_clock::now() - timer).count()); timer = chrono::high_resolution_clock::now();')
## self.context.emitc('printf("ht_postproc: %lld\\n", (chrono::high_resolution_clock::now() - timer).count()); timer = chrono::high_resolution_clock::now();')
# def deal_with_assumptions(self, assumption:assumption, out:TableInfo):
# gscanner = scan(self, self.group)
# val_var = 'val_'+base62uuid(7)
@ -771,11 +776,11 @@ class groupby_c(ast_node):
gscanner.add(f'{ce[0]}[{gscanner.it_var}] = ({get_var_names_ex(ex)});\n')
gscanner.add(f'GC::scratch_space->release();')
self.context.emitc('printf("ht_initfrom: %lld\\n", (chrono::high_resolution_clock::now() - timer).count());timer = chrono::high_resolution_clock::now();')
## self.context.emitc('printf("ht_initfrom: %lld\\n", (chrono::high_resolution_clock::now() - timer).count());timer = chrono::high_resolution_clock::now();')
gscanner.finalize()
self.context.emitc(f'GC::scratch_space = nullptr;')
self.context.emitc('printf("agg: %lld\\n", (chrono::high_resolution_clock::now() - timer).count());timer = chrono::high_resolution_clock::now();')
## self.context.emitc('printf("agg: %lld\\n", (chrono::high_resolution_clock::now() - timer).count());timer = chrono::high_resolution_clock::now();')
self.datasource.groupinfo = None
@ -1831,6 +1836,7 @@ class cache(ast_node):
name = 'cache'
first_order = name
def init(self, node):
self.context.has_payload = False
source = node['cache']['source']
# lazy = node['cache']['lazy']
lazy = 0

@ -161,6 +161,7 @@ class Context:
self.special_gb = False
self.has_dll = False
self.triggers_active.clear()
self.has_payload = True
def __init__(self, state = None):
from prompt import PromptState
@ -188,6 +189,7 @@ class Context:
self.use_gc = compile_use_gc
self.system_state: Optional[PromptState] = state
self.use_cached_tables = True
self.use_omp_simd = True
# self.new() called everytime new query batch is started
def get_scan_var(self):
@ -327,3 +329,10 @@ class Context:
self.ccode += headers + '\n'.join(self.procs)
self.headers = set()
return self.ccode
@property
def omp_simd(self):
if self.use_omp_simd:
return '#pragma omp simd\n'
else:
return ''

@ -0,0 +1,26 @@
#include "server/aggregations.h"
#include "server/table.h"
#include <chrono>
int main() {
using namespace std;
size_t n = 1'000'000;
int a[n];
for (int i = 0; i < n; ++i) {
a[i] = rand();
}
// vector_type<int> t;
//print(t);
auto time = chrono::high_resolution_clock::now();
double k = 0;
#pragma omp simd
for (size_t i = 0; i < n; ++i)
k += a[i];
k/=(double)n;
printf("%f\n", k);
printf("%llu", (chrono::high_resolution_clock::now() - time).count());
return (size_t)k;
}

@ -441,6 +441,7 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr
# time.sleep(.00001)
while state.get_ready():
state.wait_engine()
state.server_ready = True
if state.need_print:
print(f'MonetDB Time: {state.cfg.monetdb_time/10**9}, '
f'PostProc Time: {state.cfg.postproc_time/10**9}')
@ -489,7 +490,7 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr
with open('udf.hpp', 'wb') as outfile:
outfile.write(this_udf.encode('utf-8'))
if state.server_mode == RunType.Threaded:
if state.server_mode == RunType.Threaded and cxt.has_payload:
# assignment to avoid auto gc
# sqls = [s.strip() for s in cxt.sql.split(';')]
qs = [ctypes.c_char_p(bytes(q, 'utf-8')) for q in cxt.queries if len(q)]
@ -517,7 +518,7 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr
state.cfg.has_dll = 0
state.currstats.compile_time = state.currstats.stop()
cxt.post_exec_triggers()
if build_this:
if build_this and cxt.has_payload:
state.set_ready()
# while state.get_ready():
# state.wait_engine()

@ -20,6 +20,7 @@ template<class T, template<typename ...> class VT>
types::GetLongType<T>
sum(const VT<T>& v) {
types::GetLongType<T> ret = 0;
#pragma omp simd
for (auto _v : v)
ret += _v;
return ret;
@ -32,6 +33,7 @@ double avg(const VT<T>& v) {
template<class T, template<typename ...> class VT, class Ret>
void sqrt(const VT<T>& v, Ret& ret) {
#pragma omp simd
for (uint32_t i = 0; i < v.size; ++i)
ret[i] = sqrt(v[i]);
}
@ -59,6 +61,7 @@ VT<T> truncate(const VT<T>& v, const uint32_t precision) {
auto multiplier = pow(10, precision);
auto max_truncate = std::numeric_limits<T>::max()/multiplier;
VT<T> ret(v.size);
#pragma omp simd
for (uint32_t i = 0; i < v.size; ++i) { // round or trunc??
ret[i] = v[i] < max_truncate ? round(v[i] * multiplier)/multiplier : v[i];
}
@ -68,6 +71,7 @@ VT<T> truncate(const VT<T>& v, const uint32_t precision) {
template <class T, template<typename ...> class VT>
T max(const VT<T>& v) {
T max_v = std::numeric_limits<T>::min();
#pragma omp simd
for (const auto& _v : v)
max_v = max_v > _v ? max_v : _v;
return max_v;
@ -75,6 +79,7 @@ T max(const VT<T>& v) {
template <class T, template<typename ...> class VT>
T min(const VT<T>& v) {
T min_v = std::numeric_limits<T>::max();
#pragma omp simd
for (const auto& _v : v)
min_v = min_v < _v ? min_v : _v;
return min_v;
@ -85,6 +90,7 @@ template<class T, template<typename ...> class VT, class Ret>
void mins(const VT<T>& arr, Ret& ret) {
const uint32_t& len = arr.size;
T min = std::numeric_limits<T>::max();
#pragma omp simd
for (int i = 0; i < len; ++i) {
if (arr[i] < min)
min = arr[i];
@ -103,6 +109,7 @@ template<class T, template<typename ...> class VT, class Ret>
void maxs(const VT<T>& arr, Ret& ret) {
const uint32_t& len = arr.size;
T max = std::numeric_limits<T>::min();
#pragma omp simd
for (int i = 0; i < len; ++i) {
if (arr[i] > max)
max = arr[i];
@ -121,9 +128,10 @@ template<class T, template<typename ...> class VT, class Ret>
void minw(uint32_t w, const VT<T>& arr, Ret& ret) {
const uint32_t& len = arr.size;
std::deque<std::pair<T, uint32_t>> cache;
#pragma omp simd
for (int i = 0; i < len; ++i) {
if (!cache.empty() && cache.front().second == i - w) cache.pop_front();
#pragma clang loop vectorize(enable) interleave(enable)
while (!cache.empty() && cache.back().first > arr[i]) cache.pop_back();
cache.push_back({ arr[i], i });
ret[i] = cache.front().first;
@ -131,7 +139,7 @@ void minw(uint32_t w, const VT<T>& arr, Ret& ret) {
}
template<class T, template<typename ...> class VT>
decayed_t<VT, T> minw(uint32_t w, const VT<T>& arr) {
inline decayed_t<VT, T> minw(uint32_t w, const VT<T>& arr) {
decayed_t<VT, T> ret(arr.size);
minw(w, arr, ret);
return ret;
@ -141,8 +149,10 @@ template<class T, template<typename ...> class VT, class Ret>
void maxw(uint32_t w, const VT<T>& arr, Ret& ret) {
const uint32_t& len = arr.size;
std::deque<std::pair<T, uint32_t>> cache;
#pragma omp simd
for (int i = 0; i < len; ++i) {
if (!cache.empty() && cache.front().second == i - w) cache.pop_front();
#pragma clang loop vectorize(enable) interleave(enable)
while (!cache.empty() && cache.back().first < arr[i]) cache.pop_back();
cache.push_back({ arr[i], i });
ret[i] = cache.front().first;
@ -164,8 +174,10 @@ void ratiow(uint32_t w, const VT<T>& arr, Ret& ret) {
len = 1;
w = w > len ? len : w;
ret[0] = 0;
#pragma omp simd
for (uint32_t i = 0; i < w; ++i)
ret[i] = arr[i] / (FPType)arr[0];
#pragma omp simd
for (uint32_t i = w; i < arr.size; ++i)
ret[i] = arr[i] / (FPType) arr[i - w];
}
@ -191,9 +203,9 @@ inline void ratios(const VT<T>& arr, Ret& ret) {
template<class T, template<typename ...> class VT, class Ret>
void sums(const VT<T>& arr, Ret& ret) {
const uint32_t& len = arr.size;
uint32_t i = 0;
if (len) ret[i++] = arr[0];
for (; i < len; ++i)
if (len) ret[0] = arr[0];
#pragma omp simd
for (uint32_t i = 1; i < len; ++i)
ret[i] = ret[i - 1] + arr[i];
}
@ -208,10 +220,10 @@ template<class T, template<typename ...> class VT, class Ret>
void avgs(const VT<T>& arr, Ret& ret) {
const uint32_t& len = arr.size;
typedef types::GetFPType<types::GetLongType<T>> FPType;
uint32_t i = 0;
types::GetLongType<T> s;
if (len) s = ret[i++] = arr[0];
for (; i < len; ++i)
if (len) s = ret[0] = arr[0];
#pragma omp simd
for (uint32_t i = 1; i < len; ++i)
ret[i] = (s += arr[i]) / (FPType)(i + 1);
}
@ -226,12 +238,13 @@ inline decayed_t<VT, types::GetFPType<types::GetLongType<T>>> avgs(const VT<T>&
template<class T, template<typename ...> class VT, class Ret>
void sumw(uint32_t w, const VT<T>& arr, Ret& ret) {
const uint32_t& len = arr.size;
uint32_t i = 0;
w = w > len ? len : w;
if (len) ret[i++] = arr[0];
for (; i < w; ++i)
if (len) ret[0] = arr[0];
#pragma omp simd
for (uint32_t i = 1; i < w; ++i)
ret[i] = ret[i - 1] + arr[i];
for (; i < len; ++i)
#pragma omp simd
for (uint32_t i = w; i < len; ++i)
ret[i] = ret[i - 1] + arr[i] - arr[i - w];
}
@ -246,13 +259,15 @@ template<class T, template<typename ...> class VT, class Ret>
void avgw(uint32_t w, const VT<T>& arr, Ret& ret) {
typedef types::GetFPType<types::GetLongType<T>> FPType;
const uint32_t& len = arr.size;
uint32_t i = 0;
types::GetLongType<T> s{};
w = w > len ? len : w;
if (len) s = ret[i++] = arr[0];
for (; i < w; ++i)
if (len) s = ret[0] = arr[0];
#pragma omp simd
for (uint32_t i = 1; i < w; ++i)
ret[i] = (s += arr[i]) / (FPType)(i + 1);
for (; i < len; ++i)
#pragma omp simd
for (uint32_t i = w; i < len; ++i)
ret[i] = ret[i - 1] + (arr[i] - arr[i - w]) / (FPType)w;
}
@ -270,7 +285,7 @@ void varw(uint32_t w, const VT<T>& arr,
Ret& ret) {
using FPType = types::GetFPType<types::GetLongType<T>>;
const uint32_t& len = arr.size;
uint32_t i = 0;
types::GetLongType<T> s{};
w = w > len ? len : w;
FPType EnX {}, MnX{};
@ -278,9 +293,10 @@ void varw(uint32_t w, const VT<T>& arr,
s = arr[0];
MnX = 0;
EnX = arr[0];
ret[i++] = 0;
ret[0] = 0;
}
for (; i < len; ++i){
#pragma omp simd
for (uint32_t i = 1; i < w; ++i) {
s += arr[i];
FPType _EnX = s / (FPType)(i + 1);
MnX += (arr[i] - EnX) * (arr[i] - _EnX);
@ -290,7 +306,8 @@ void varw(uint32_t w, const VT<T>& arr,
}
const float rw = 1.f / (float)w;
s *= rw;
for (; i < len; ++i){
#pragma omp simd
for (uint32_t i = w; i < len; ++i) {
const auto dw = arr[i] - arr[i - w - 1];
const auto sw = arr[i] + arr[i - w - 1];
const auto dex = dw * rw;
@ -299,8 +316,8 @@ void varw(uint32_t w, const VT<T>& arr,
s += dex;
}
if constexpr(sd)
if(i)
ret[i-1] = sqrt(ret[i-1]);
if(len)
ret[len-1] = sqrt(ret[len-1]);
}
@ -316,14 +333,14 @@ template<class T, template<typename ...> class VT>
types::GetFPType<types::GetLongType<decays<T>>> var(const VT<T>& arr) {
typedef types::GetFPType<types::GetLongType<decays<T>>> FPType;
const uint32_t& len = arr.size;
uint32_t i = 0;
types::GetLongType<T> s{0};
types::GetLongType<T> ssq{0};
if (len) {
s = arr[0];
ssq = arr[0] * arr[0];
}
for (; i < len; ++i){
#pragma omp simd
for (uint32_t i = 1; i < len; ++i){
s += arr[i];
ssq += arr[i] * arr[i];
}
@ -334,7 +351,6 @@ template<class T, template<typename ...> class VT, class Ret, bool sd = false>
void vars(const VT<T>& arr, Ret& ret) {
typedef types::GetFPType<types::GetLongType<T>> FPType;
const uint32_t& len = arr.size;
uint32_t i = 0;
types::GetLongType<T> s{};
FPType MnX{};
FPType EnX {};
@ -342,9 +358,10 @@ void vars(const VT<T>& arr, Ret& ret) {
s = arr[0];
MnX = 0;
EnX = arr[0];
ret[i++] = 0;
ret[0] = 0;
}
for (; i < len; ++i){
#pragma omp simd
for (uint32_t i = 1; i < len; ++i){
s += arr[i];
FPType _EnX = s / (FPType)(i + 1);
MnX += (arr[i] - EnX) * (arr[i] - _EnX);
@ -373,6 +390,7 @@ auto corr(const VT<T>& x, const VT2<T2>&y) {
// assert(x.size == y.size);
const uint32_t& len = x.size;
LongType sx{0}, sy{0}, sxy{0}, sx2{0}, sy2{0};
#pragma omp simd
for (uint32_t i = 0; i < len; ++i) {
sx += x[i];
sx2 += x[i] * x[i];
@ -417,14 +435,13 @@ inline auto stddevw(uint32_t w, const VT<T>& arr, Ret& ret) {
return varw<T, VT, Ret, true>(w, arr, ret);
}
// use getSignedType
template<class T, template<typename ...> class VT, class Ret>
void deltas(const VT<T>& arr, Ret& ret) {
const uint32_t& len = arr.size;
uint32_t i = 0;
if (len) ret[i++] = 0;
for (; i < len; ++i)
if (len) ret[0] = 0;
#pragma omp simd
for (uint32_t i = 1; i < len; ++i)
ret[i] = arr[i] - arr[i - 1];
}
@ -438,9 +455,9 @@ inline decayed_t<VT, T> deltas(const VT<T>& arr) {
template<class T, template<typename ...> class VT, class Ret>
void prev(const VT<T>& arr, Ret& ret) {
const uint32_t& len = arr.size;
uint32_t i = 0;
if (len) ret[i++] = arr[0];
for (; i < len; ++i)
if (len) ret[0] = arr[0];
#pragma omp simd
for (uint32_t i = 1; i < len; ++i)
ret[i] = arr[i - 1];
}
@ -454,8 +471,8 @@ inline decayed_t<VT, T> prev(const VT<T>& arr) {
template<class T, template<typename ...> class VT, class Ret>
void aggnext(const VT<T>& arr, Ret& ret) {
const uint32_t& len = arr.size;
uint32_t i = 1;
for (; i < len; ++i)
#pragma omp simd
for (uint32_t i = 1; i < len; ++i)
ret[i - 1] = arr[i];
if (len > 0) ret[len - 1] = arr[len - 1];
}

@ -209,7 +209,7 @@ struct PerfectHashTable {
template <typename ... Types, template <typename> class VT>
static vector_type<uint32_t>*
construct(VT<Types>&... args) { // construct a hash set
AQTmr();
// AQTmr();
int n_cols, n_rows = 0;
((n_cols = args.size), ...);

@ -278,7 +278,9 @@ void MonetdbServer::getDSTable(const char* name, void* tbl) {
)
);
}
monetdbe_get_cols(*(void**)(this->server), name, cols, table->n_cols);
auto sz = monetdbe_get_cols(*(void**)(this->server), name, cols, table->n_cols);
for (int i = 0; i < table->n_cols; ++i)
table->colrefs[i].size = sz;
}
MonetdbServer::~MonetdbServer(){

@ -30,7 +30,7 @@ monetdbe_get_size(void* dbhdl, const char *table_name, void*);
extern "C" void*
monetdbe_get_col(void* dbhdl, const char *table_name, uint32_t col_id);
extern "C" void
extern "C" unsigned int
monetdbe_get_cols(void* dbhdl, const char* table_name, void*** cols, int i);
#endif

@ -113,7 +113,7 @@ monetdbe_get_col(monetdbe_database dbhdl, const char *table_name, uint32_t col_i
return iter.base;
}
void monetdbe_get_cols(
unsigned int monetdbe_get_cols(
monetdbe_database dbhdl,
const char* table_name,
void*** cols,
@ -123,13 +123,17 @@ void monetdbe_get_cols(
backend* be = ((backend *)(((monetdbe_database_internal*)dbhdl)->c->sqlcontext));
mvc *m = be->mvc;
sql_table *t = find_table_or_view_on_scope(m, NULL, "sys", table_name, "CATALOG", false);
if (!i || !t) return;
if (!i || !t) return 0;
node *n = t->columns->l->h;
sqlstore* store = m->store;
size_t sz = 0;
if (n && i > 0)
sz = store->storage_api.count_col(m->session->tr, n->data, QUICK);
while(n && i-- > 0) {
BAT *b = store->storage_api.bind_col(m->session->tr, n->data, QUICK);
BATiter iter = bat_iterator(b);
*(cols++) = iter.base;
**(cols++) = iter.base;
n = n->next;
}
return sz;
}

@ -573,6 +573,7 @@ start:
}
server->getDSTable(cached_table, tbl);
cxt->tables[cached_table] = tbl;
// server->exec( (
// std::string("SELECT * FROM ") + cached_table + std::string(";")
// ).c_str() );

@ -32,7 +32,8 @@ constexpr static bool is_vector_type = is_vector_impl<T>::value;
template <class T>
constexpr size_t aq_szof = sizeof(T);
template <>
constexpr size_t aq_szof<void> = 0;
inline constexpr size_t aq_szof<void> = 0;
template <class T1, class T2>
struct aqis_same_impl {
constexpr static bool value =

@ -0,0 +1,6 @@
create table t(a int);
insert into t(a) values (1), (2), (3), (4)
LOAD DATA INFILE "1m.csv"
INTO TABLE t
FIELDS TERMINATED BY ","

@ -0,0 +1,2 @@
create table t (a int, b int)
insert into t values(1,2), (2,3), (3,2), (4,3), (5,2), (6,3), (44, 2), (55,2), (5, 3), (7, 2), (9, 2), (123, 3)
Loading…
Cancel
Save