fixed int128 problem, groupby agg

dev
Bill 2 years ago
parent 5b1bbf0f99
commit 6490aab558

@ -13,11 +13,11 @@ info:
$(info $(OS))
$(info "test")
server.bin:
$(CXX) server/server.cpp $(OS_SUPPORT) --std=c++1z -O3 -march=native -o server.bin
$(CXX) server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) -flto --std=c++1z -O3 -march=native -o server.bin
server.so:
# $(CXX) server/server.cpp server/monetdb_conn.cpp -fPIC -shared $(OS_SUPPORT) monetdb/msvc/monetdbe.dll --std=c++1z -O3 -march=native -o server.so -I./monetdb/msvc
$(CXX) -shared -fPIC server/server.cpp $(OS_SUPPORT) server/monetdb_conn.cpp $(MonetDB_LIB) --std=c++1z -o server.so -O3
$(CXX) -shared -fPIC -flto server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) server/monetdb_conn.cpp $(MonetDB_LIB) --std=c++1z -o server.so -O3
snippet:
$(CXX) -shared -fPIC --std=c++1z out.cpp server/monetdb_conn.cpp $(MonetDB_LIB) -O3 -march=native -o dll.so
$(CXX) -shared -fPIC -flto --std=c++1z out.cpp server/monetdb_conn.cpp server/table.cpp server/io.cpp $(MonetDB_LIB) -O3 -march=native -o dll.so
clean:
rm *.shm -rf

@ -4,8 +4,9 @@ import os
# os.environ['CXX'] = 'C:/Program Files/LLVM/bin/clang.exe'
add_path_to_ldpath = True
rebuild_backend = False
run_backend = False
rebuild_backend = True
run_backend = True
have_hge = False
os_platform = 'unkown'

@ -1,4 +1,5 @@
from engine.utils import defval
from aquery_config import have_hge
from typing import Dict, List
type_table: Dict[str, "Types"] = {}
@ -75,6 +76,8 @@ LazyT = Types(240, name = 'Lazy', cname = '', sqlname = '', ctype_name = '')
DoubleT = Types(17, name = 'double', cname='double', sqlname = 'DOUBLE', is_fp = True)
FloatT = Types(16, name = 'float', cname = 'float', sqlname = 'REAL',
long_type = DoubleT, is_fp = True)
HgeT = Types(9, name = 'int128',cname='__int128_t', sqlname = 'HUGEINT', fp_type = DoubleT)
UHgeT = Types(10, name = 'uint128', cname='__uint128_t', sqlname = 'HUGEINT', fp_type = DoubleT)
LongT = Types(4, name = 'int64', sqlname = 'BIGINT', fp_type = DoubleT)
ByteT = Types(1, name = 'int8', sqlname = 'TINYINT', long_type=LongT, fp_type=FloatT)
ShortT = Types(2, name = 'int16', sqlname='SMALLINT', long_type=LongT, fp_type=FloatT)
@ -167,7 +170,7 @@ def binary_op_behavior(op:OperatorBase, c_code, x, y):
def unary_op_behavior(op:OperatorBase, c_code, x):
name = op.cname if c_code else op.sqlname
return f'({x} {name})'
return f'({name} {x})'
def fn_behavior(op:OperatorBase, c_code, *x):
name = op.cname if c_code else op.sqlname

@ -1,31 +1,17 @@
#include "./server/aggregations.h"
#include "./udf.hpp"
#include "./server/hasher.h"
#include <unordered_map>
#include "./server/monetdb_conn.h"
#include "./server/aggregations.h"
#include "./server/libaquery.h"
extern "C" int __DLLEXPORT__ dllmain(Context* cxt) {
using namespace std;
using namespace types;
auto server = static_cast<Server*>(cxt->alt_server);
auto len_4fxytV = server->cnt;
auto b_3pr = ColRef<int>(len_4fxytV, server->getCol(0));
auto a_65L = ColRef<int>(len_4fxytV, server->getCol(1));
auto out_2UnEpP = new TableInfo<value_type<decays<decltype(covariances2_gettype(a_65L, b_3pr, 4))>>>("out_2UnEpP");
auto col_C9QF0Z = out_2UnEpP->get_col<0>();
for (uint32_t i41 = 0; i41 < t1K4f4I0.size; ++i41){
g1zdpLFa[forward_as_tuple(t1K4f4I0[i41])].emplace_back(i41);
}
for (const auto& i40 : g1zdpLFa){
auto &len_5NTOM6m = val_2423Z8E.size;
auto &key_6fZPUDS = i4O.first;
auto &val_2423Z8E = i4O.second;
col_C9QF0Z.emplace_back({len_5NTOM6m});
covariances2(a_65L[val_2423Z8E], b_3pr[val_2423Z8E], 4, len_5NTOM6m, col_C9QF0Z.back());
}
print(*out_2UnEpP);
auto len_6SzLPm = server->cnt;
auto sales_5fe = ColRef<int>(len_6SzLPm, server->getCol(0));
auto a_yJz = ColRef<int>(len_6SzLPm, server->getCol(1));
auto out_4UoFb5 = new TableInfo<value_type<decays<decltype((sd(a_yJz) + sales_5fe))>>>("out_4UoFb5");
out_4UoFb5->get_col<0>() = (sd(a_yJz) + sales_5fe);
print(*out_4UoFb5);
return 0;
}

@ -165,6 +165,7 @@ def init_threaded():
server_so = ctypes.CDLL('./'+server_bin)
global cfg, th, send
send = server_so['receive_args']
aquery_config.have_hge = server_so['have_hge']()
th = threading.Thread(target=server_so['main'], args=(-1, ctypes.POINTER(ctypes.c_char_p)(cfg.c)), daemon=True)
th.start()
@ -256,7 +257,7 @@ while test_parser:
sh.interact(banner = 'debugging session began.', exitmsg = 'debugging session ended.')
except BaseException as e:
# don't care about anything happened in interactive console
print(e.with_traceback())
print(e)
elif q.startswith('log'):
qs = re.split(r'[ \t]', q)
if len(qs) > 1:

@ -80,12 +80,12 @@ class projection(ast_node):
# deal with projections
self.out_table = TableInfo('out_'+base62uuid(4), [], self.context)
cols = []
col_ext : Set[ColRef]= set()
self.col_ext : Set[ColRef]= set()
col_exprs : List[Tuple[str, Types]] = []
proj_map : Dict[int, List[Union[Types, int, str, expr]]]= dict()
var_table = dict()
self.sp_refs = set()
self.var_table = dict()
# self.sp_refs = set()
for i, proj in enumerate(self.projections):
compound = False
self.datasource.rec = set()
@ -101,21 +101,20 @@ class projection(ast_node):
if not proj_expr.is_special:
y = lambda x:x
name = eval('f\'' + name + '\'')
if name not in var_table:
var_table[name] = len(col_exprs)
if name not in self.var_table:
self.var_table[name] = len(col_exprs)
proj_map[i] = [this_type, len(col_exprs), proj_expr]
col_exprs.append((name, proj_expr.type))
else:
self.context.headers.add('"./server/aggregations.h"')
if self.datasource.rec is not None:
col_ext = col_ext.union(self.datasource.rec) # TODO: make this one var?
self.sp_refs = self.sp_refs.union(self.datasource.rec)
self.col_ext = self.col_ext.union(self.datasource.rec)
proj_map[i] = [this_type, proj_expr.sql, proj_expr]
if 'name' in proj: # renaming column by AS keyword
name += ' AS ' + proj['name']
if not proj_expr.is_special:
var_table[proj['name']] = len(col_exprs)
self.var_table[proj['name']] = len(col_exprs)
disp_name = get_legal_name(name)
@ -126,26 +125,30 @@ class projection(ast_node):
self.datasource.rec = None
# TODO: Type deduction in Python
cols.append(ColRef(this_type, self.out_table, None, disp_name, i, compound=compound))
col_ext = [c for c in col_ext if c.name not in var_table] # remove duplicates in var_table
col_ext_names = [c.name for c in col_ext]
if 'groupby' in node:
self.group_node = groupby(self, node['groupby'])
else:
self.group_node = None
self.col_ext = [c for c in self.col_ext if c.name not in self.var_table] # remove duplicates in self.var_table
col_ext_names = [c.name for c in self.col_ext]
self.add(', '.join([c[0] for c in col_exprs] + col_ext_names))
_base_offset = len(col_exprs)
for i, col in enumerate(col_ext_names):
if col not in var_table:
var_table[col] = i + _base_offset
if col not in self.var_table:
self.var_table[col] = i + _base_offset
def finialize(astnode:ast_node):
if(astnode is not None):
self.add(astnode.sql)
self.add('FROM')
self.add('FROM')
finialize(self.datasource)
finialize(self.where)
if 'groupby' in node:
self.group_node = groupby(self, node['groupby'])
else:
self.group_node = None
finialize(self.group_node)
if 'orderby' in node:
self.add(orderby(self, node['orderby']).sql)
if 'outfile' in node:
@ -160,15 +163,15 @@ class projection(ast_node):
# cpp module codegen
self.context.has_dll = True
# extract typed-columns from result-set
vid2cname = [0]*len(var_table)
pyname2cname = dict()
typenames = [c[1] for c in col_exprs] + [c.type for c in col_ext]
vid2cname = [0]*len(self.var_table)
self.pyname2cname = dict()
typenames = [c[1] for c in col_exprs] + [c.type for c in self.col_ext]
length_name = 'len_' + base62uuid(6)
self.context.emitc(f'auto {length_name} = server->cnt;')
for v, idx in var_table.items():
for v, idx in self.var_table.items():
vname = get_legal_name(v) + '_' + base62uuid(3)
pyname2cname[v] = vname
self.pyname2cname[v] = vname
self.context.emitc(f'auto {vname} = ColRef<{typenames[idx].cname}>({length_name}, server->getCol({idx}));')
vid2cname[idx] = vname
# Create table into context
@ -178,14 +181,18 @@ class projection(ast_node):
for key, val in proj_map.items():
if type(val[1]) is str:
x = True
y = lambda t: pyname2cname[t]
y = lambda t: self.pyname2cname[t]
val[1] = val[2].eval(x, y, gettype=True)
if callable(val[1]):
val[1] = val[1](True)
decltypestring = val[1]
if val[0] == LazyT:
out_typenames[key] = f'value_type<decays<decltype({decltypestring})>>'
decltypestring = f'value_type<decays<decltype({decltypestring})>>'
if type(val[2].udf) is udf and val[2].udf.return_pattern == udf.ReturnPattern.elemental_return:
out_typenames[key] = f'ColRef<{decltypestring}>'
else:
out_typenames[key] = decltypestring
else:
out_typenames[key] = val[0].cname
@ -193,13 +200,13 @@ class projection(ast_node):
self.context.emitc(f'auto {outtable_name} = new TableInfo<{",".join(out_typenames)}>("{outtable_name}");')
# TODO: Inject custom group by code here and flag them in proj_map
# Type of UDFs? Complex UDFs, ones with static vars?
if self.group_node is not None:
gb_vartable : Dict[str, Union[str, int]] = deepcopy(pyname2cname)
if self.group_node is not None and self.group_node.use_sp_gb:
gb_vartable : Dict[str, Union[str, int]] = deepcopy(self.pyname2cname)
gb_cexprs : List[str] = []
for key, val in proj_map.items():
col_name = 'col_' + base62uuid(6)
self.context.emitc(f'auto {col_name} = {outtable_name}->get_col<{key}>();')
self.context.emitc(f'decltype(auto) {col_name} = {outtable_name}->get_col<{key}>();')
gb_cexprs.append((col_name, val[2]))
self.group_node.finalize(gb_cexprs, gb_vartable)
else:
@ -231,6 +238,17 @@ class orderby(ast_node):
class scan(ast_node):
class Position(Enum):
init = auto()
front = auto()
body = auto()
back = auto()
fin = auto()
# TODO: use this for positions for scanner
class LoopStyle(Enum):
forloop = auto()
foreach = auto()
name = 'scan'
def __init__(self, parent: "ast_node", node, loop_style = 'for', context: Context = None, const = False):
self.const = "const " if const else ""
@ -253,7 +271,7 @@ class scan(ast_node):
def produce(self, node):
if self.loop_style == 'for_each':
self.colref = node
self.start += f'for ({self.const}auto& {self.it_ver} : {node.cobj}) {{\n'
self.start += f'for ({self.const}auto& {self.it_ver} : {node}) {{\n'
else:
self.start += f"for (uint32_t {self.it_ver} = 0; {self.it_ver} < {node}; ++{self.it_ver}){{\n"
@ -270,7 +288,9 @@ class scan(ast_node):
class groupby_c(ast_node):
name = '_groupby'
def init(self, _):
self.proj : projection = self.parent
return super().init(_)
def produce(self, node : List[Tuple[expr, Set[ColRef]]]):
self.context.headers.add('"./server/hasher.h"')
self.context.headers.add('unordered_map')
@ -285,19 +305,19 @@ class groupby_c(ast_node):
first_col = ''
for g in node:
e = g[0]
g_str = e.eval(c_code = True)
e = expr(self, g[0].node, c_code=True)
g_str = e.eval(c_code = True, y = lambda c: self.proj.pyname2cname[c])
# if v is compound expr, create tmp cols
if e.is_ColExpr:
tmpcol = 't' + base62uuid(7)
self.emit(f'auto {tmpcol} = {g_str};')
self.context.emitc(f'auto {tmpcol} = {g_str};')
e = tmpcol
g_contents_list.append(e)
first_col = g_contents_list[0]
g_contents_decltype = [f'decays<decltype({c})>' for c in g_contents_list]
g_contents_decltype = [f'decays<decltype({c})::value_t>' for c in g_contents_list]
g_contents = ','.join(g_contents_list)
self.emit(f'typedef record<{",".join(g_contents_decltype)}> {self.group_type};')
self.emit(f'unordered_map<{self.group_type}, vector_type<uint32_t>, '
self.context.emitc(f'typedef record<{",".join(g_contents_decltype)}> {self.group_type};')
self.context.emitc(f'unordered_map<{self.group_type}, vector_type<uint32_t>, '
f'transTypes<{self.group_type}, hasher>> {self.group};')
self.n_grps = len(node)
self.scanner = scan(self, first_col + '.size')
@ -314,12 +334,12 @@ class groupby_c(ast_node):
# gscanner.finalize()
def finalize(self, cexprs : List[Tuple[str, expr]], var_table : Dict[str, Union[str, int]]):
gscanner = scan(self, self.group)
gscanner = scan(self, self.group, loop_style = 'for_each')
key_var = 'key_'+base62uuid(7)
val_var = 'val_'+base62uuid(7)
gscanner.add(f'auto &{key_var} = {gscanner.it_ver}.first;')
gscanner.add(f'auto &{val_var} = {gscanner.it_ver}.second;')
gscanner.add(f'auto &{key_var} = {gscanner.it_ver}.first;', position = 'front')
gscanner.add(f'auto &{val_var} = {gscanner.it_ver}.second;', position = 'front')
len_var = None
def define_len_var():
nonlocal len_var
@ -356,29 +376,22 @@ class groupby_c(ast_node):
class groupby(ast_node):
name = 'group by'
@property
def use_sp_gb (self):
return len(self.sp_refs) > 0
def produce(self, node):
if type(self.parent) is not projection:
raise ValueError('groupby can only be used in projection')
sp_refs = self.parent.sp_refs
node = enlist(node)
o_list = []
self.dedicated_glist = []
self.refs : Set[ColRef] = set()
self.sp_refs : Set[ColRef] = set()
self.dedicated_glist : List[Tuple[expr, Set[ColRef]]] = []
self.use_sp_gb = False
for g in node:
self.datasource.rec = set()
g_expr = expr(self, g['value'])
refs : Set[ColRef] = self.datasource.rec
self.datasource.rec = None
this_sp_ref = refs.difference(sp_refs)
this_ref = refs.intersection(this_sp_ref)
# TODO: simplify this
self.refs = self.refs.union(this_ref)
self.sp_refs = self.sp_refs.union(this_sp_ref)
if self.parent.col_ext:
this_sp_ref = refs.difference(self.parent.col_ext)
self.use_sp_gb = self.use_sp_gb or len(this_sp_ref) > 0
self.dedicated_glist.append((g_expr, refs))
g_str = g_expr.eval(c_code = False)
@ -389,7 +402,13 @@ class groupby(ast_node):
if not self.use_sp_gb:
self.dedicated_gb = None
self.add(', '.join(o_list))
else:
for l in self.dedicated_glist:
# l_exist = l[1].difference(self.parent.col_ext)
# for l in l_exist:
# self.parent.var_table.
self.parent.col_ext.update(l[1])
def finalize(self, cexprs : List[Tuple[str, expr]], var_table : Dict[str, Union[str, int]]):
if self.use_sp_gb:
self.dedicated_gb = groupby_c(self.parent, self.dedicated_glist)

@ -37,7 +37,7 @@ class expr(ast_node):
self.codlets : list = []
self.codebuf : Optional[str] = None
self._udf_decltypecall = None
self.node = node
self.supress_undefined = supress_undefined
if(type(parent) is expr):
self.inside_agg = parent.inside_agg

@ -100,6 +100,7 @@ class Context:
self.print = print
self.has_dll = False
self.dialect = 'MonetDB'
self.have_hge = False
self.new()

@ -0,0 +1,11 @@
#include "io.h"
char* gbuf = 0;
void setgbuf(char* buf){
static char* b = 0;
if (buf == 0)
gbuf = b;
else
gbuf = buf;
}

@ -2,6 +2,7 @@
#include "types.h"
#include <cstdio>
#include <string>
#include <limits>
template <class ...Types>
std::string generate_printf_string(const char* sep = " ", const char* end = "\n") {
std::string str;
@ -13,3 +14,51 @@ std::string generate_printf_string(const char* sep = " ", const char* end = "\n"
return str;
}
#ifdef __SIZEOF_INT128__
inline const char* get_int128str(__int128_t v, char* buf){
bool neg = false;
if (v < 0) {
if(v == std::numeric_limits<__int128_t>::min())
return "-170141183460469231731687303715884105728";
v = -v;
neg = true;
}
do {
*--buf = v%10 + '0';
v /= 10;
} while(v);
if (neg) *--buf = '-';
return buf;
}
inline const char* get_uint128str(__uint128_t v, char* buf){
do {
*--buf = v%10 + '0';
v /= 10;
} while(v);
return buf;
}
extern char* gbuf;
void setgbuf(char* buf = 0);
template<class T>
inline decltype(auto) printi128(const T& v){
return v;
}
template<>
inline decltype(auto) printi128<__int128_t>(const __int128_t& v) {
*(gbuf+=40) = 0;
return get_int128str(v, gbuf++);
}
template<>
inline decltype(auto) printi128<__uint128_t>(const __uint128_t& v) {
*(gbuf+=40) = 0;
return get_uint128str(v, gbuf++);
}
#else
#define printi128(x) x
#endif

@ -48,9 +48,10 @@ struct Context{
};
#ifdef _WIN32
#define __DLLEXPORT__ __declspec(dllexport) __stdcall
#define __DLLEXPORT__ __declspec(dllexport) __stdcall
#else
#define __DLLEXPORT__
#endif
#define __AQEXPORT__(_Ty) extern "C" _Ty __DLLEXPORT__
#endif

@ -24,4 +24,4 @@ struct Server{
void *getCol(int col_idx);
void close();
~Server();
};
};

@ -28,6 +28,7 @@ struct SharedMemory
}
};
#endif
struct thread_context{
}v;
@ -59,6 +60,15 @@ extern "C" int __DLLEXPORT__ binary_info() {
return AppleClang;
#endif
}
__AQEXPORT__(bool) have_hge(){
#if defined(_MONETDBE_LIB_) and defined(HAVE_HGE)
return HAVE_HGE;
#else
return false;
#endif
}
int dll_main(int argc, char** argv, Context* cxt){
Config *cfg = reinterpret_cast<Config *>(argv[0]);
@ -111,50 +121,50 @@ int dll_main(int argc, char** argv, Context* cxt){
return 0;
}
//
//extern "C" int __DLLEXPORT__ main(int argc, char** argv) {
//
// puts("running");
// Context* cxt = new Context();
// cxt->log("%d %s\n", argc, argv[1]);
//
// const char* shmname;
// if (argc < 0)
// return dll_main(argc, argv, cxt);
// else if (argc <= 1)
// return test_main();
// else
// shmname = argv[1];
// SharedMemory shm = SharedMemory(shmname);
// if (!shm.pData)
// return 1;
// bool &running = static_cast<bool*>(shm.pData)[0],
// &ready = static_cast<bool*>(shm.pData)[1];
// using namespace std::chrono_literals;
// cxt->log("running: %s\n", running? "true":"false");
// cxt->log("ready: %s\n", ready? "true":"false");
// while (running) {
// std::this_thread::sleep_for(1ms);
// if(ready){
// cxt->log("running: %s\n", running? "true":"false");
// cxt->log("ready: %s\n", ready? "true":"false");
// void* handle = dlopen("./dll.so", RTLD_LAZY);
// cxt->log("handle: %lx\n", handle);
// if (handle) {
// cxt->log("inner\n");
// code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, "dllmain"));
// cxt->log("routine: %lx\n", c);
// if (c) {
// cxt->log("inner\n");
// cxt->err("return: %d\n", c(cxt));
// }
// }
// ready = false;
// }
// }
// shm.FreeMemoryMap();
// return 0;
//}
extern "C" int __DLLEXPORT__ main(int argc, char** argv) {
puts("running");
Context* cxt = new Context();
cxt->log("%d %s\n", argc, argv[1]);
const char* shmname;
if (argc < 0)
return dll_main(argc, argv, cxt);
else if (argc <= 1)
return test_main();
else
shmname = argv[1];
SharedMemory shm = SharedMemory(shmname);
if (!shm.pData)
return 1;
bool &running = static_cast<bool*>(shm.pData)[0],
&ready = static_cast<bool*>(shm.pData)[1];
using namespace std::chrono_literals;
cxt->log("running: %s\n", running? "true":"false");
cxt->log("ready: %s\n", ready? "true":"false");
while (running) {
std::this_thread::sleep_for(1ms);
if(ready){
cxt->log("running: %s\n", running? "true":"false");
cxt->log("ready: %s\n", ready? "true":"false");
void* handle = dlopen("./dll.so", RTLD_LAZY);
cxt->log("handle: %lx\n", handle);
if (handle) {
cxt->log("inner\n");
code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, "dllmain"));
cxt->log("routine: %lx\n", c);
if (c) {
cxt->log("inner\n");
cxt->err("return: %d\n", c(cxt));
}
}
ready = false;
}
}
shm.FreeMemoryMap();
return 0;
}
#include "utils.h"
int test_main()
{
@ -200,6 +210,7 @@ int test_main()
dlclose(handle);
}
//static_assert(std::is_same_v<decltype(fill_integer_array<5, 1>()), std::integer_sequence<bool, 1,1,1,1,1>>, "");
return 0;
std::unordered_map<int, int> a;
}

@ -1 +1,28 @@
#include "table.h"
#ifdef __SIZEOF_INT128__
template <>
void print<__int128_t>(const __int128_t& v, const char* delimiter){
char s[41];
s[40] = 0;
std::cout<< get_int128str(v, s+40);
}
template <>
void print<__uint128_t>(const __uint128_t&v, const char* delimiter){
char s[41];
s[40] = 0;
std::cout<< get_uint128str(v, s+40);
}
std::ostream& operator<<(std::ostream& os, __int128 & v)
{
print(v);
return os;
}
std::ostream& operator<<(std::ostream& os, __uint128_t & v)
{
print(v);
return os;
}
#endif

@ -63,14 +63,19 @@ public:
using vector_type<_Ty>::operator[];
using vector_type<_Ty>::operator=;
using vector_type<_Ty>::subvec;
using vector_type<_Ty>::subvec_view;
using vector_type<_Ty>::subvec_memcpy;
using vector_type<_Ty>::subvec_deep;
ColView<_Ty> operator [](const vector_type<uint32_t>&idxs) const {
return ColView<_Ty>(*this, idxs);
}
void out(uint32_t n = 4, const char* sep = " ") const {
n = n > this->size ? this->size : n;
const char* more = "";
if (n < this->size)
more = " ... ";
else
n = this->size;
std::cout << '(';
if (n > 0)
{
@ -79,6 +84,7 @@ public:
std::cout << this->operator[](i) << sep;
std::cout << this->operator[](i);
}
std::cout<< more;
std::cout << ')';
}
template<typename T>
@ -90,7 +96,7 @@ class ColView {
public:
typedef ColRef<_Ty> Decayed_t;
const vector_type<uint32_t>& idxs;
const ColRef<_Ty>& orig;
const ColRef<_Ty> orig;
const uint32_t& size;
ColView(const ColRef<_Ty>& orig, const vector_type<uint32_t>& idxs) : orig(orig), idxs(idxs), size(idxs.size) {}
ColView(const ColView<_Ty>& orig, const vector_type<uint32_t>& idxs) : orig(orig.orig), idxs(idxs), size(idxs.size) {
@ -135,6 +141,10 @@ public:
ret[i] = orig[idxs[i]];
return ret;
}
ColView<_Ty> subvec(uint32_t start, uint32_t end) {
uint32_t len = end - start;
return ColView<_Ty>(orig, idxs.subvec(start, end));
}
ColRef<_Ty> subvec_deep(uint32_t start, uint32_t end) {
uint32_t len = end - start;
ColRef<_Ty> subvec(len);
@ -142,7 +152,7 @@ public:
subvec[i] = operator[](i);
return subvec;
}
inline ColRef<_Ty> subvec_deep(uint32_t start = 0) { return subvec_deep(start, size); }
inline ColRef<_Ty> subvec(uint32_t start = 0) { return subvec_deep(start, size); }
};
template <template <class...> class VT, class T>
std::ostream& operator<<(std::ostream& os, const VT<T>& v)
@ -150,8 +160,11 @@ std::ostream& operator<<(std::ostream& os, const VT<T>& v)
v.out();
return os;
}
std::ostream& operator<<(std::ostream& os, __int128 & v);
std::ostream& operator<<(std::ostream& os, __uint128_t & v);
template <class Type>
struct decayed_impl<ColView, Type> { typedef ColRef<Type> type; };
template<typename _Ty>
template<typename T>
inline ColRef<T> ColRef<_Ty>::scast()
@ -186,6 +199,7 @@ struct is_vector_impl<vector_type<V>> : std::true_type {};
template<class ...Types>
struct TableView;
template<class ...Types>
struct TableInfo {
const char* name;
@ -211,14 +225,17 @@ struct TableInfo {
rid.emplace_back(v);
}
};
template<class ...Types2>
auto bind(TableInfo<Types2...>* table2) {
return lineage_t(this, table2);
}
template <size_t i = 0>
auto& get_col() {
return *reinterpret_cast<ColRef<std::tuple_element_t <i, tuple_type>>*>(colrefs + i);
}
template <size_t j = 0>
typename std::enable_if<j == sizeof...(Types) - 1, void>::type print_impl(const uint32_t& i, const char* __restrict sep = " ") const;
template <size_t j = 0>
@ -227,6 +244,7 @@ struct TableInfo {
struct GetTypes {
typedef typename std::tuple<typename std::tuple_element<Idxs, tuple_type>::type ...> type;
};
template <size_t ...Idxs>
using getRecordType = typename GetTypes<Idxs...>::type;
TableInfo(const char* name, uint32_t n_cols);
@ -276,9 +294,9 @@ struct TableInfo {
const auto& this_value = get<col>(*this)[i];
const auto& next = [&](auto &v) {
if constexpr (sizeof...(rem_cols) == 0)
func(args..., v);
func(args..., printi128(v));
else
print2_impl<rem_cols...>(func, i, args ..., v);
print2_impl<rem_cols...>(func, i, args ..., printi128(v));
};
if constexpr (is_vector_type<this_type>)
for (int j = 0; j < this_value.size; ++j)
@ -312,12 +330,20 @@ struct TableInfo {
header_string.resize(header_string.size() - l_sep);
const auto& prt_loop = [&fp, &view, &printf_string, *this](const auto& f) {
constexpr auto num_hge = count_type<__int128_t, __uint128_t>((tuple_type*)(0));
char cbuf[num_hge * 41];
setgbuf(cbuf);
if(view)
for (int i = 0; i < view->size; ++i)
for (int i = 0; i < view->size; ++i){
print2_impl<cols...>(f, (*view)[i], printf_string.c_str());
setgbuf();
}
else
for (int i = 0; i < colrefs[0].size; ++i)
for (int i = 0; i < colrefs[0].size; ++i){
print2_impl<cols...>(f, i, printf_string.c_str());
setgbuf();
}
};
if (fp)
@ -538,6 +564,13 @@ void print(const T& v, const char* delimiter = " ") {
std::cout<< v;
// printf(types::printf_str[types::Types<T>::getType()], v);
}
#ifdef __SIZEOF_INT128__
template <>
void print<__int128_t>(const __int128_t& v, const char* delimiter);
template <>
void print<__uint128_t>(const __uint128_t&v, const char* delimiter);
#endif
template <class T>
void inline print_impl(const T& v, const char* delimiter, const char* endline) {
for (const auto& vi : v) {

@ -21,11 +21,11 @@ constexpr static bool is_vector_type = is_vector_impl<T>::value;
namespace types {
enum Type_t {
AINT32, AFLOAT, ASTR, ADOUBLE, ALDOUBLE, AINT64, AINT16, ADATE, ATIME, AINT8,
AUINT32, AUINT64, AUINT16, AUINT8, VECTOR, NONE, ERROR
AINT32, AFLOAT, ASTR, ADOUBLE, ALDOUBLE, AINT64, AINT128, AINT16, ADATE, ATIME, AINT8,
AUINT32, AUINT64, AUINT128, AUINT16, AUINT8, VECTOR, NONE, ERROR
};
static constexpr const char* printf_str[] = { "%d", "%f", "%s", "%lf", "%llf", "%ld", "%hi", "%s", "%s", "%c",
"%u", "%lu", "%hu", "%hhu", "Vector<%s>", "NULL", "ERROR" };
static constexpr const char* printf_str[] = { "%d", "%f", "%s", "%lf", "%llf", "%ld", "%s", "%hi", "%s", "%s", "%c",
"%u", "%lu", "%s", "%hu", "%hhu", "Vector<%s>", "NULL", "ERROR" };
// TODO: deal with data/time <=> str/uint conversion
struct date_t {
uint32_t val;
@ -43,6 +43,17 @@ namespace types {
struct Types {
typedef T type;
constexpr Types() noexcept = default;
#ifdef __SIZEOF_INT128__
#define F_INT128(__F_) __F_(__int128_t, AINT128) \
__F_(__uint128_t, AUINT128)
#define ULL_Type __uint128_t
#define LL_Type __int128_t
#else
#define F_INT128
#define ULL_Type unsigned long long
#define LL_Type long long
#endif
#define ConnectTypes(f) \
f(int, AINT32) \
f(float, AFLOAT) \
@ -57,7 +68,8 @@ namespace types {
f(unsigned int, AUINT32) \
f(unsigned long, AUINT64) \
f(unsigned short, AUINT16) \
f(unsigned char, AUINT8)
f(unsigned char, AUINT8) \
F_INT128(f)
inline constexpr static Type_t getType() {
#define TypeConnect(x, y) if constexpr(std::is_same<x, T>::value) return y; else
@ -90,7 +102,7 @@ namespace types {
using GetFPType = typename GetFPTypeImpl<typename std::decay<T>::type>::type;
template<class T>
struct GetLongTypeImpl {
using type = Cond(__U(T), unsigned long long, Cond(Fp(T), long double, long long));
using type = Cond(__U(T), ULL_Type, Cond(Fp(T), long double, LL_Type));
};
template<class T>
using GetLongType = typename GetLongTypeImpl<typename std::decay<T>::type>::type;
@ -202,4 +214,19 @@ struct nullval_impl<float> { constexpr static float value = -std::numeric_limits
template<>
struct nullval_impl<double> { constexpr static double value = -std::numeric_limits<double>::quiet_NaN(); };
constexpr size_t sum_type(size_t a[], size_t sz) {
size_t ret = 0;
for (int i = 0; i < sz; ++i)
ret += a[i];
return ret;
}
template<class Types, class ...T1> constexpr size_t sum_type() {
size_t t[] = {std::is_same_v<Types, T1> ...};
return sum_type(t, sizeof...(T1));
}
template<class ...T1, class ...Types> constexpr
size_t count_type(std::tuple<Types...>* ts) {
size_t t[] = {sum_type<Types, T1...>() ...};
return sum_type(t, sizeof...(Types));
}
#endif // !_TYPES_H

@ -145,7 +145,7 @@ public:
return curr;
}
_Ty& operator[](const uint32_t _i) const {
inline _Ty& operator[](const uint32_t _i) const {
return container[_i];
}
@ -212,15 +212,15 @@ public:
size = this->size + dist;
}
void out(uint32_t n = 4, const char* sep = " ") const;
vector_type<_Ty> subvec(uint32_t start, uint32_t end) {
vector_type<_Ty> subvec_memcpy(uint32_t start, uint32_t end) const {
vector_type<_Ty> subvec(end - start);
memcpy(subvec.container, container + start, sizeof(_Ty) * (end - start));
return subvec;
}
inline vector_type<_Ty> subvec_view(uint32_t start, uint32_t end) {
return subvec(container + start, end - start);
inline vector_type<_Ty> subvec(uint32_t start, uint32_t end) const {
return vector_type<_Ty>(container + start, end - start);
}
vector_type<_Ty> subvec_deep(uint32_t start, uint32_t end) {
vector_type<_Ty> subvec_deep(uint32_t start, uint32_t end) const {
uint32_t len = end - start;
vector_type<_Ty> subvec(len);
for (uint32_t i = 0; i < len; ++i)
@ -228,7 +228,7 @@ public:
return subvec;
}
inline vector_type<_Ty> subvec(uint32_t start = 0) { return subvec(start, size); }
inline vector_type<_Ty> subvec_view(uint32_t start = 0) { return subvec_view(start, size); }
inline vector_type<_Ty> subvec_memcpy(uint32_t start = 0) { return subvec_memcpy(start, size); }
inline vector_type<_Ty> subvec_deep(uint32_t start = 0) { return subvec_deep(start, size); }
~vector_type() {
@ -305,14 +305,12 @@ public:
void* get(uint32_t i, types::Type_t atype){
return static_cast<void*>(static_cast<char*>(container) + (i * types::AType_sizes[atype]));
}
void operator[](const uint32_t& i) {
}
void operator[](const uint32_t& i);
vector_type<void> subvec(uint32_t, uint32_t);
vector_type<void> subvec_view(uint32_t, uint32_t);
vector_type<void> subvec_memcpy(uint32_t, uint32_t);
vector_type<void> subvec_deep(uint32_t, uint32_t);
vector_type<void> subvec(uint32_t);
vector_type<void> subvec_view(uint32_t);
vector_type<void> subvec_memcpy(uint32_t);
vector_type<void> subvec_deep(uint32_t);
};
#pragma pack(pop)

@ -18,7 +18,7 @@ INSERT INTO stocks VALUES(15,2)
INSERT INTO stocks VALUES(16,5)
/*<k> "q1" </k>*/
SELECT max(price-min(timestamp)) FROM stocks
-- SELECT max(price-min(timestamp)) FROM stocks
/*<k> "q2" </k>*/
SELECT max(price-mins(price)) FROM stocks

@ -2,55 +2,17 @@
#include "./server/libaquery.h"
#include "./server/aggregations.h"
auto covariances2 = [](auto x, auto y, auto w, uint32_t _builtin_len, auto& _builtin_ret) {
auto xmeans = 0.0;
auto ymeans = 0.0;
auto l = _builtin_len;
if((l > 0)) {
xmeans = x[0];
ymeans = y[0];
_builtin_ret[0] = 0.0;
}
if((w > l))
w = l;
for(auto i = 1, j = 0; (i < w); i = (i + 1)) {
xmeans += x[i];
ymeans += y[i];
_builtin_ret[i] = avg(((x.subvec(0, i) - (xmeans / i)) * (y.subvec(0, i) - (ymeans / i))));
}
xmeans /= w;
ymeans /= w;
for(auto i = w; (i < l); i += 1) {
xmeans += ((x[i] - x[(i - w)]) / w);
ymeans += ((y[i] - y[(i - w)]) / w);
_builtin_ret[i] = avg(((x.subvec((i - w), i) - xmeans) * (y.subvec((i - w), i) - ymeans)));
}
return ;
auto covariance = [](auto x, auto y) {
auto xmean = avg(x);
auto ymean = avg(y);
return avg(((x - xmean) * (y - ymean)));
};
auto covariances2_gettype = [](auto x, auto y, auto w) {
uint32_t _builtin_len = 0;
auto xmeans = 0.0;
auto ymeans = 0.0;
auto l = _builtin_len;
if((l > 0)) {
xmeans = x[0];
ymeans = y[0];
return 0.0;
}
if((w > l))
w = l;
for(auto i = 1, j = 0; (i < w); i = (i + 1)) {
xmeans += x[i];
ymeans += y[i];
return avg(((x.subvec(0, i) - (xmeans / i)) * (y.subvec(0, i) - (ymeans / i))));
}
xmeans /= w;
ymeans /= w;
for(auto i = w; (i < l); i += 1) {
xmeans += ((x[i] - x[(i - w)]) / w);
ymeans += ((y[i] - y[(i - w)]) / w);
return avg(((x.subvec((i - w), i) - xmeans) * (y.subvec((i - w), i) - ymeans)));
}
auto sd = [](auto x) {
return sqrt(covariance(x, x));
};
auto paircorr = [](auto x, auto y) {
return (covariance(x, y) / (sd(x) * sd(y)));
};

@ -1,6 +1,4 @@
AGGREGATION FUNCTION add(a, b){
a+b
}
AGGREGATION FUNCTION covariances2(x, y, w){
xmeans := 0.;
@ -33,9 +31,9 @@ AGGREGATION FUNCTION covariances2(x, y, w){
CREATE TABLE test(a INT, b INT, c INT, d INT)
LOAD DATA INFILE "test.csv"
LOAD DATA INFILE "test2.csv"
INTO TABLE test
FIELDS TERMINATED BY ","
select covariances2(a, b, 4) from test group by c;
select covariances2(a, b, 4), a+b from test group by c;

Loading…
Cancel
Save