Added var(s), stddev(s). New ITC, stats. bugfix on aggregations.

dev
Bill 2 years ago
parent ba21da23a3
commit 50740fe6ac

@ -1,7 +1,7 @@
OS_SUPPORT =
MonetDB_LIB =
MonetDB_INC =
Threading =
Defines =
CXXFLAGS = --std=c++1z
ifeq ($(AQ_DEBUG), 1)
OPTFLAGS = -g3 -fsanitize=address -fsanitize=leak
@ -80,16 +80,19 @@ endif
ifeq ($(THREADING),1)
LIBAQ_SRC += server/threading.cpp
LIBAQ_OBJ += threading.o
Threading += -DTHREADING
Defines += -DTHREADING
endif
ifeq ($(AQUERY_ITC_USE_SHMEM), 1)
Defines += -D__AQUERY_ITC_USE_SHMEM__
endif
SHAREDFLAGS += $(FPIC)
info:
$(info $(OPTFLAGS))
$(info $(OS_SUPPORT))
$(info $(OS))
$(info $(Threading))
$(info $(Defines))
$(info "test")
$(info $(LIBTOOL))
$(info $(MonetDB_INC))
@ -97,26 +100,26 @@ info:
$(info $(CXX))
$(info $(FPIC))
pch:
$(CXX) -x c++-header server/pch.hpp $(FPIC) $(MonetDB_INC) $(OPTFLAGS) $(CXXFLAGS) $(Threading)
$(CXX) -x c++-header server/pch.hpp $(FPIC) $(MonetDB_INC) $(OPTFLAGS) $(CXXFLAGS) $(Defines)
libaquery.a:
$(CXX) -c $(FPIC) $(PCHFLAGS) $(LIBAQ_SRC) $(MonetDB_INC) $(MonetDB_LIB) $(OS_SUPPORT) $(Threading) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) &&\
$(CXX) -c $(FPIC) $(PCHFLAGS) $(LIBAQ_SRC) $(MonetDB_INC) $(MonetDB_LIB) $(OS_SUPPORT) $(Defines) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) &&\
$(LIBTOOL) libaquery.a $(LIBAQ_OBJ) &&\
$(RANLIB) libaquery.a
server.bin:
$(CXX) $(LIBAQ_SRC) $(LINKFLAGS) $(OS_SUPPORT) $(Threading) $(MonetDB_INC) $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o server.bin
$(CXX) $(LIBAQ_SRC) $(LINKFLAGS) $(OS_SUPPORT) $(Defines) $(MonetDB_INC) $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o server.bin
launcher:
$(CXX) -D__AQ_BUILD_LAUNCHER__ $(LIBAQ_SRC) $(LINKFLAGS) $(OS_SUPPORT) $(Threading) $(MonetDB_INC) $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o aq
$(CXX) -D__AQ_BUILD_LAUNCHER__ $(LIBAQ_SRC) $(LINKFLAGS) $(OS_SUPPORT) $(Defines) $(MonetDB_INC) $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o aq
server.so:
# $(CXX) -z muldefs server/server.cpp server/monetdb_conn.cpp -fPIC -shared $(OS_SUPPORT) monetdb/msvc/monetdbe.dll --std=c++1z -O3 -march=native -o server.so -I./monetdb/msvc
$(CXX) $(SHAREDFLAGS) $(PCHFLAGS) $(LIBAQ_SRC) $(OS_SUPPORT) $(Threading) $(MonetDB_INC) $(MonetDB_LIB) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o server.so
$(CXX) $(SHAREDFLAGS) $(PCHFLAGS) $(LIBAQ_SRC) $(OS_SUPPORT) $(Defines) $(MonetDB_INC) $(MonetDB_LIB) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o server.so
server_uselib:
$(CXX) $(SHAREDFLAGS) $(USELIB_FLAG),libaquery.a $(MonetDB_LIB) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o server.so
snippet:
$(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp $(LIBAQ_SRC) $(MonetDB_INC) $(MonetDB_LIB) $(Threading) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o dll.so
$(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp $(LIBAQ_SRC) $(MonetDB_INC) $(MonetDB_LIB) $(Defines) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o dll.so
snippet_uselib:
$(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp libaquery.a $(MonetDB_INC) $(Threading) $(MonetDB_LIB) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o dll.so
$(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp libaquery.a $(MonetDB_INC) $(Defines) $(MonetDB_LIB) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o dll.so
docker:
docker build -t aquery .

@ -225,6 +225,7 @@ DROP TABLE my_table IF EXISTS
## Built-in functions:
- `avg[s]`: average of a column. `avgs(col), avgs(w, col)` is rolling and moving average with window `w` of the column `col`.
- `var[s]`, `stddev[s]`: [moving/rolling] **population** variance, standard deviation.
- `sum[s]`, `max[s]`, `min[s]`: similar to `avg[s]`
- `ratios(w = 1, col)`: moving ratio of a column, e.g. `ratios(w, col)[i]=col[i-w]/col[i]`. Window `w` has default value of 1.
- `next(col), prev(col)`: moving column back and forth by 1, e.g. `next(col)[i] = col[i+1]`.

@ -2,7 +2,7 @@
## GLOBAL CONFIGURATION FLAGS
version_string = '0.5.0a'
version_string = '0.5.1a'
add_path_to_ldpath = True
rebuild_backend = False
run_backend = True
@ -21,7 +21,8 @@ def init_config():
import os
from engine.utils import add_dll_dir
# os.environ['CXX'] = 'C:/Program Files/LLVM/bin/clang.exe'
# os.environ['THREADING'] = '1'
os.environ['THREADING'] = '1'
os.environ['AQUERY_ITC_USE_SHMEM'] = '1'
if ('__config_initialized__' not in globals() or
not __config_initialized__):

@ -1,11 +1,21 @@
a, b, c, d
1,1,2,2
2,1,2,2
2,4,3,4
1,2,2,2
1,2,3,4
4,2,1,4
2,1,3,4
2,1,3,3
2,1,1,2
1,2,3,4
3,2,4,2
1,2,3,3
3,2,1,2
2,1,2,2
2,1,4,2
3,3,4,4
2,2,3,1
2,3,4,4
2,4,1,2
3,4,1,2
2,3,2,2
1,2,3,1

1 a b c d
2 1 1 2 2
3 2 1 2 2
4 2 4 3 4
5 1 2 2 2
6 1 2 3 4
7 4 2 1 4
8 2 1 3 4 3
9 2 1 1 2
10 1 2 3 4
11 3 2 4 2
12 1 2 3 3
13 3 2 1 2
14 2 1 2 4 2
15 3 3 4 4
16 2 2 3 1
17 2 3 4 4
18 2 4 1 2
19 3 4 1 2
20 2 3 2 2
21 1 2 3 1

@ -151,5 +151,5 @@ int gen_stock_data(int argc, char* argv[]){
}
int main(int argc, char* argv[]){
gen_stock_data(argc, argv);
return gen_stock_data(argc, argv);
}

@ -324,10 +324,14 @@ fnfirst = OperatorBase('first', 1, as_is, cname = 'frist', sqlname = 'FRIST', ca
#fnavg = OperatorBase('avg', 1, fp(ext(auto_extension)), cname = 'avg', sqlname = 'AVG', call = fn_behavior)
fnsum = OperatorBase('sum', 1, long_return, cname = 'sum', sqlname = 'SUM', call = fn_behavior)
fnavg = OperatorBase('avg', 1, lfp_return, cname = 'avg', sqlname = 'AVG', call = fn_behavior)
fnvar = OperatorBase('var', 1, lfp_return, cname = 'var', sqlname = 'VAR_POP', call = fn_behavior)
fnstd = OperatorBase('stddev', 1, lfp_return, cname = 'stddev', sqlname = 'STDDEV_POP', call = fn_behavior)
fnmaxs = OperatorBase('maxs', [1, 2], ty_clamp(as_is, -1), cname = 'maxs', sqlname = 'MAXS', call = windowed_fn_behavor)
fnmins = OperatorBase('mins', [1, 2], ty_clamp(as_is, -1), cname = 'mins', sqlname = 'MINS', call = windowed_fn_behavor)
fnsums = OperatorBase('sums', [1, 2], ext(ty_clamp(auto_extension, -1)), cname = 'sums', sqlname = 'SUMS', call = windowed_fn_behavor)
fnavgs = OperatorBase('avgs', [1, 2], fp(ext(ty_clamp(auto_extension, -1))), cname = 'avgs', sqlname = 'AVGS', call = windowed_fn_behavor)
fnvars = OperatorBase('vars', [1, 2], fp(ext(ty_clamp(auto_extension, -1))), cname = 'vars', sqlname = 'VARS', call = windowed_fn_behavor)
fnstds = OperatorBase('stddevs', [1, 2], fp(ext(ty_clamp(auto_extension, -1))), cname = 'stddevs', sqlname = 'STDDEVS', call = windowed_fn_behavor)
fncnt = OperatorBase('count', 1, int_return, cname = 'count', sqlname = 'COUNT', call = count_behavior)
fnpack = OperatorBase('pack', -1, pack_return, cname = 'pack', sqlname = 'PACK', call = pack_behavior)
# special
@ -361,7 +365,8 @@ builtin_cstdlib = _op_make_dict(fnsqrt, fnlog, fnsin, fncos, fntan, fnpow)
builtin_func = _op_make_dict(fnmax, fnmin, fnsum, fnavg, fnmaxs,
fnmins, fndeltas, fnratios, fnlast,
fnfirst, fnsums, fnavgs, fncnt,
fnpack, fntrunc, fnprev, fnnext)
fnpack, fntrunc, fnprev, fnnext,
fnvar, fnvars, fnstd, fnstds)
user_module_func = {}
builtin_operators : Dict[str, OperatorBase] = {**builtin_binary_arith, **builtin_binary_logical,
**builtin_unary_arith, **builtin_unary_logical, **builtin_unary_special, **builtin_func, **builtin_cstdlib,

@ -160,8 +160,10 @@ class QueryStats:
class Config:
__all_attrs__ = ['running', 'new_query', 'server_mode',
'backend_type', 'has_dll',
'postproc_time', 'sql_time',
'n_buffers'
'n_buffers',
]
__i64_attrs__ = [
'monetdb_time', 'postproc_time'
]
__init_attributes__ = False
@ -171,26 +173,42 @@ class Config:
from functools import partial
for _i, attr in enumerate(Config.__all_attrs__):
if not hasattr(Config, attr):
setattr(Config, attr, property(partial(Config.getter, i = _i), partial(Config.setter, i = _i)))
setattr(Config, attr, property(
partial(Config.getter, i = _i), partial(Config.setter, i = _i)
))
for _i, attr in enumerate(Config.__i64_attrs__):
if not hasattr(Config, attr):
setattr(Config, attr, property(
partial(Config.i64_getter, i = _i), partial(Config.i64_setter, i = _i)
))
Config.__init_attributes__ = True
def __init__(self, mode, nq = 0, n_bufs = 0, bf_szs = []) -> None:
Config.__init_self__()
self.int_size = 4
self.n_attrib = len(Config.__all_attrs__)
self.buf = bytearray((self.n_attrib + n_bufs) * self.int_size)
self.buf = bytearray((self.n_attrib + n_bufs) * 4 +
len(self.__i64_attrs__) * 8
)
self.np_buf = np.ndarray(shape = (self.n_attrib), buffer = self.buf, dtype = np.int32)
self.np_i64buf = np.ndarray(shape = len(self.__i64_attrs__), buffer = self.buf,
dtype = np.int64, offset = 4 * len(self.__all_attrs__))
self.new_query = nq
self.server_mode = mode.value
self.running = 1
self.backend_type = Backend_Type.BACKEND_AQuery.value
self.has_dll = 0
self.n_buffers = n_bufs
self.monetdb_time = 0
self.postproc_time = 0
def getter (self, *, i):
return self.np_buf[i]
def setter(self, v, *, i):
self.np_buf[i] = v
def i64_getter (self, *, i):
return self.np_i64buf[i]
def i64_setter(self, v, *, i):
self.np_i64buf[i] = v
def set_bufszs(self, buf_szs):
for i in range(min(len(buf_szs), self.n_buffers)):
@ -209,6 +227,8 @@ class PromptState():
test_parser = True
server_mode: RunType = RunType.Threaded
server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so'
wait_engine = lambda: None
wake_engine = lambda: None
set_ready = lambda: None
get_ready = lambda: None
server_status = lambda: False
@ -299,6 +319,8 @@ def init_threaded(state : PromptState):
if aquery_config.run_backend:
server_so = ctypes.CDLL('./'+state.server_bin)
state.send = server_so['receive_args']
state.wait_engine = server_so['wait_engine']
state.wake_engine = server_so['wake_engine']
aquery_config.have_hge = server_so['have_hge']()
if aquery_config.have_hge != 0:
from engine.types import get_int128_support
@ -337,6 +359,8 @@ def init_prompt() -> PromptState:
rm = lambda: None
def __set_ready():
state.cfg.new_query = 1
state.wake_engine()
state.set_ready = __set_ready
state.get_ready = lambda: aquery_config.run_backend and state.cfg.new_query
if aquery_config.run_backend:
@ -380,9 +404,16 @@ def prompt(running = lambda:True, next = lambda:input('> '), state = None):
while running():
try:
if state.server_status():
state.init()
state.init(state)
# *** busy waiting ***
# while state.get_ready():
# time.sleep(.00001)
while state.get_ready():
time.sleep(.00001)
state.wait_engine()
if state.need_print:
print(f'MonetDB Time: {state.cfg.monetdb_time/10**9}, '
f'PostProc Time: {state.cfg.postproc_time/10**9}')
state.cfg.monetdb_time = state.cfg.postproc_time = 0
state.currstats.print(state.stats, need_print=state.need_print)
try:
og_q : str = next()

@ -54,19 +54,28 @@ class ast_node:
self.context.sql_end()
from reconstruct.expr import expr, fastscan
class SubqType(Enum):
WITH = auto()
FROM = auto()
PROJECTION = auto()
FILTER = auto()
GROUPBY = auto()
ORDERBY = auto()
NONE = auto()
class projection(ast_node):
name = 'projection'
first_order = 'select'
def __init__(self,
parent : Optional["ast_node"],
node,
context : Optional[Context] = None,
force_use_spgb : bool = False
force_use_spgb : bool = False,
subq_type: SubqType = SubqType.NONE
):
self.force_use_spgb = force_use_spgb
self.subq_type = subq_type
super().__init__(parent, node, context)
def init(self, _):
@ -83,10 +92,22 @@ class projection(ast_node):
p = node['select_distinct']
self.distinct = True
if 'with' in node:
self.with_clause = projection(self, node['value'])
with_table = node['with']['name']
with_table_name = tuple(with_table.keys())[0]
with_table_cols = tuple(with_table.values())[0]
self.with_clause = projection(self, node['with']['value'], subq_type=SubqType.WITH)
self.with_clause.out_table.add_alias(with_table_name)
for new_name, col in zip(with_table_cols, self.with_clause.out_table.columns):
col.rename(new_name)
self.with_clause.out_table.contextname_cpp
# in monetdb, in cxt
else:
self.with_clause = None
self.limit = None
if 'limit' in node:
self.limit = node['limit']
self.projections = p if type(p) is list else [p]
if self.parent is None:
self.context.sql_begin()
@ -115,8 +136,9 @@ class projection(ast_node):
if type(self.datasource) is join:
self.datasource.process_join_conditions()
self.context.special_gb = self.force_use_spgb
if 'groupby' in node: # if groupby clause contains special stuff
self.context.special_gb = groupby.check_special(self, node['groupby'])
self.context.special_gb |= groupby.check_special(self, node['groupby'])
def consume(self, node):
# deal with projections
@ -230,7 +252,7 @@ class projection(ast_node):
self.group_node = groupby(self, node['groupby'])
if self.group_node.terminate:
self.context.abandon_query()
projection(self.parent, node, self.context, True)
projection(self.parent, node, self.context, True, subq_type=self.subq_type)
return
if self.group_node.use_sp_gb:
self.has_postproc = True
@ -370,8 +392,12 @@ class projection(ast_node):
# for funcs evaluate f_i(x, ...)
self.context.emitc(f'{self.out_table.contextname_cpp}->get_col<{key}>().initfrom({val[1]}, "{cols[i].name}");')
# print out col_is
if 'into' not in node:
if 'into' not in node and self.subq_type == SubqType.NONE:
if self.limit is None:
self.context.emitc(f'print(*{self.out_table.contextname_cpp});')
else:
self.context.emitc(f'{self.out_table.contextname_cpp}->printall(" ","\\n", nullptr, nullptr, {self.limit});')
if self.outfile:
self.outfile.finalize()
@ -627,7 +653,7 @@ class groupby(ast_node):
self.gb_cols = set()
# dedicated_glist -> cols populated for special group by
self.dedicated_glist : List[Tuple[expr, Set[ColRef]]] = []
self.use_sp_gb = False
self.use_sp_gb = self.parent.force_use_spgb
for g in node:
self.datasource.rec = set()
g_expr = expr(self, g['value'])
@ -654,10 +680,11 @@ class groupby(ast_node):
if (projs[2].is_compound and
not ((projs[2].is_ColExpr and projs[2].raw_col in self.gb_cols) or
projs[2].sql in self.gb_cols)
):
if self.parent.force_use_spgb:
) and (not self.parent.force_use_spgb):
self.use_sp_gb = True
else:
break
if self.use_sp_gb and not self.parent.force_use_spgb:
self.terminate = True
return
if not self.use_sp_gb:
@ -1144,7 +1171,7 @@ class load(ast_node):
self.context.emitc(f'{c.cxt_name}.emplace_back({col_tmp_names[i]});')
self.context.emitc('}')
self.context.emitc(f'print(*{self.out_table});')
# self.context.emitc(f'print(*{self.out_table});')
self.context.emitc(f'{self.out_table}->monetdb_append_table(cxt->alt_server, "{table.table_name}");')
self.context.postproc_end(self.postproc_fname)

@ -168,7 +168,7 @@ class expr(ast_node):
special_func = [*self.context.udf_map.keys(), *self.context.module_map.keys(),
"maxs", "mins", "avgs", "sums", "deltas", "last", "first",
"ratios", "pack", "truncate"]
"stddevs", "vars", "ratios", "pack", "truncate"]
if (
self.context.special_gb

@ -45,6 +45,14 @@ class ColRef:
alias = table_name
return f'{alias}.{self.get_name()}'
def rename(self, name):
self.alias.discard(self.name)
self.table.columns_byname.pop(self.name, None)
self.name = name
self.table.columns_byname[name] = self
return self
def __getitem__(self, key):
if type(key) is str:
return getattr(self, key)
@ -98,6 +106,17 @@ class TableInfo:
self.cxt.tables_byname[alias] = self
self.alias.add(alias)
def rename(self, name):
if name in self.cxt.tables_byname.keys():
print(f"Error: table name {name} already exists")
return
self.cxt.tables_byname.pop(self.table_name, None)
self.alias.discard(self.table_name)
self.table_name = name
self.cxt.tables_byname[name] = self
self.alias.add(name)
def parse_col_names(self, colExpr) -> ColRef:
parsedColExpr = colExpr.split('.')
if len(parsedColExpr) <= 1:

@ -202,6 +202,102 @@ decayed_t<VT, types::GetFPType<types::GetLongType<T>>> avgw(uint32_t w, const VT
return ret;
}
template<class T, template<typename ...> class VT, bool sd = false>
decayed_t<VT, types::GetFPType<types::GetLongType<T>>> varw(uint32_t w, const VT<T>& arr) {
using FPType = types::GetFPType<types::GetLongType<T>>;
const uint32_t& len = arr.size;
decayed_t<VT, FPType> ret(len);
uint32_t i = 0;
types::GetLongType<T> s{};
w = w > len ? len : w;
FPType EnX {}, MnX{};
if (len) {
s = arr[0];
MnX = 0;
EnX = arr[0];
ret[i++] = 0;
}
for (; i < len; ++i){
s += arr[i];
FPType _EnX = s / (FPType)(i + 1);
MnX += (arr[i] - EnX) * (arr[i] - _EnX);
EnX = _EnX;
ret[i] = MnX / (FPType)(i + 1);
if constexpr(sd) ret[i-1] = sqrt(ret[i-1]);
}
const float rw = 1.f / (float)w;
s *= rw;
for (; i < len; ++i){
const auto dw = arr[i] - arr[i - w - 1];
const auto sw = arr[i] + arr[i - w - 1];
const auto dex = dw * rw;
ret[i] = ret[i-1] - dex*(s + s + dex - sw);
if constexpr(sd) ret[i-1] = sqrt(ret[i-1]);
s += dex;
}
if constexpr(sd)
if(i)
ret[i-1] = sqrt(ret[i-1]);
return ret;
}
template<class T, template<typename ...> class VT>
types::GetFPType<types::GetLongType<decays<T>>> var(const VT<T>& arr) {
typedef types::GetFPType<types::GetLongType<decays<T>>> FPType;
const uint32_t& len = arr.size;
uint32_t i = 0;
types::GetLongType<T> s{0};
types::GetLongType<T> ssq{0};
if (len) {
s = arr[0];
ssq = arr[0] * arr[0];
}
for (; i < len; ++i){
s += arr[i];
ssq += arr[i] * arr[i];
}
return (ssq - s * s / (FPType)(len + 1)) / (FPType)(len + 1);
}
template<class T, template<typename ...> class VT, bool sd = false>
decayed_t<VT, types::GetFPType<types::GetLongType<T>>> vars(const VT<T>& arr) {
typedef types::GetFPType<types::GetLongType<T>> FPType;
const uint32_t& len = arr.size;
decayed_t<VT, FPType> ret(len);
uint32_t i = 0;
types::GetLongType<T> s{};
FPType MnX{};
FPType EnX {};
if (len) {
s = arr[0];
MnX = 0;
EnX = arr[0];
ret[i++] = 0;
}
for (; i < len; ++i){
s += arr[i];
FPType _EnX = s / (FPType)(i + 1);
MnX += (arr[i] - EnX) * (arr[i] - _EnX);
printf("%d %ld ", arr[i], MnX);
EnX = _EnX;
ret[i] = MnX / (FPType)(i + 1);
if constexpr(sd) ret[i] = sqrt(ret[i]);
}
return ret;
}
template<class T, template<typename ...> class VT>
types::GetFPType<types::GetLongType<decays<T>>> stddev(const VT<T>& arr) {
return sqrt(var(arr));
}
template<class T, template<typename ...> class VT>
decayed_t<VT, types::GetFPType<types::GetLongType<T>>> stddevs(const VT<T>& arr) {
return vars<T, VT, true>(arr);
}
template<class T, template<typename ...> class VT>
decayed_t<VT, types::GetFPType<types::GetLongType<T>>> stddevw(uint32_t w, const VT<T>& arr) {
return varw<T, VT, true>(w, arr);
}
// use getSignedType
template<class T, template<typename ...> class VT>
decayed_t<VT, T> deltas(const VT<T>& arr) {
@ -251,26 +347,33 @@ T first(const VT<T>& arr) {
}
#define __DEFAULT_AGGREGATE_FUNCTION__(NAME, RET) \
template <class T> constexpr inline T NAME(const T& v) { return RET; }
template <class T> constexpr T NAME(const T& v) { return RET; }
// non-aggreation count. E.g. SELECT COUNT(col) from table;
template <class T> constexpr inline T count(const T& v) { return 1; }
template <class T> constexpr inline T max(const T& v) { return v; }
template <class T> constexpr inline T min(const T& v) { return v; }
template <class T> constexpr inline T avg(const T& v) { return v; }
template <class T> constexpr inline T sum(const T& v) { return v; }
template <class T> constexpr inline T maxw(uint32_t, const T& v) { return v; }
template <class T> constexpr inline T minw(uint32_t, const T& v) { return v; }
template <class T> constexpr inline T avgw(uint32_t, const T& v) { return v; }
template <class T> constexpr inline T sumw(uint32_t, const T& v) { return v; }
template <class T> constexpr inline T ratiow(uint32_t, const T& v) { return 1; }
template <class T> constexpr inline T maxs(const T& v) { return v; }
template <class T> constexpr inline T mins(const T& v) { return v; }
template <class T> constexpr inline T avgs(const T& v) { return v; }
template <class T> constexpr inline T sums(const T& v) { return v; }
template <class T> constexpr inline T last(const T& v) { return v; }
template <class T> constexpr inline T prev(const T& v) { return v; }
template <class T> constexpr inline T aggnext(const T& v) { return v; }
template <class T> constexpr inline T daltas(const T& v) { return 0; }
template <class T> constexpr inline T ratios(const T& v) { return 1; }
template <class T> constexpr T count(const T&) { return 1; }
template <class T> constexpr T var(const T&) { return 0; }
template <class T> constexpr T vars(const T&) { return 0; }
template <class T> constexpr T varw(uint32_t, const T&) { return 0; }
template <class T> constexpr T stddev(const T&) { return 0; }
template <class T> constexpr T stddevs(const T&) { return 0; }
template <class T> constexpr T stddevw(uint32_t, const T&) { return 0; }
template <class T> constexpr T max(const T& v) { return v; }
template <class T> constexpr T min(const T& v) { return v; }
template <class T> constexpr T avg(const T& v) { return v; }
template <class T> constexpr T sum(const T& v) { return v; }
template <class T> constexpr T maxw(uint32_t, const T& v) { return v; }
template <class T> constexpr T minw(uint32_t, const T& v) { return v; }
template <class T> constexpr T avgw(uint32_t, const T& v) { return v; }
template <class T> constexpr T sumw(uint32_t, const T& v) { return v; }
template <class T> constexpr T ratiow(uint32_t, const T&) { return 1; }
template <class T> constexpr T maxs(const T& v) { return v; }
template <class T> constexpr T mins(const T& v) { return v; }
template <class T> constexpr T avgs(const T& v) { return v; }
template <class T> constexpr T sums(const T& v) { return v; }
template <class T> constexpr T last(const T& v) { return v; }
template <class T> constexpr T prev(const T& v) { return v; }
template <class T> constexpr T aggnext(const T& v) { return v; }
template <class T> constexpr T daltas(const T&) { return 0; }
template <class T> constexpr T ratios(const T&) { return 1; }

@ -3,6 +3,7 @@
#include "table.h"
#include <unordered_map>
#include <chrono>
enum Log_level {
LOG_INFO,
@ -15,9 +16,16 @@ enum Backend_Type {
BACKEND_MonetDB,
BACKEND_MariaDB
};
struct QueryStats{
long long monet_time;
long long postproc_time;
};
struct Config{
int running, new_query, server_mode,
backend_type, has_dll, exec_time, n_buffers;
backend_type, has_dll,
n_buffers;
QueryStats stats;
int buffer_sizes[];
};
@ -67,6 +75,27 @@ struct Context{
std::unordered_map<const char*, uColRef *> cols;
};
class aq_timer {
private:
std::chrono::high_resolution_clock::time_point now;
public:
aq_timer(){
now = std::chrono::high_resolution_clock::now();
}
void reset(){
now = std::chrono::high_resolution_clock::now();
}
long long elapsed(){
long long ret = (std::chrono::high_resolution_clock::now() - now).count();
reset();
return ret;
}
long long lap() const{
long long ret = (std::chrono::high_resolution_clock::now() - now).count();
return ret;
}
};
#ifdef _WIN32
#define __DLLEXPORT__ __declspec(dllexport) __stdcall
#else

@ -18,6 +18,7 @@
#include <sys/mman.h>
struct SharedMemory
{
std::atomic<bool> a;
int hFileMap;
void* pData;
SharedMemory(const char* fname) {
@ -31,6 +32,79 @@ struct SharedMemory
}
};
#ifndef __USE_STD_SEMAPHORE__
#ifdef __APPLE__
#include <dispatch/dispatch.h>
class A_Semaphore {
private:
dispatch_semaphore_t native_handle;
public:
A_Semaphore(bool v = false) {
native_handle = dispatch_semaphore_create(v);
}
void acquire() {
puts("acquire");
dispatch_semaphore_wait(native_handle, DISPATCH_TIME_FOREVER);
}
void release() {
puts("release");
dispatch_semaphore_signal(native_handle);
}
~A_Semaphore() {
}
};
#else
#include <semaphore.h>
class A_Semaphore {
private:
sem_t native_handle;
public:
A_Semaphore(bool v = false) {
sem_init(&native_handle, v, 1);
}
void acquire() {
sem_wait(&native_handle);
}
void release() {
sem_post(&native_handle);
}
~A_Semaphore() {
sem_destroy(&native_handle);
}
};
#endif
#endif
#endif
#ifdef __USE_STD_SEMAPHORE__
#include <semaphore>
class A_Semaphore {
private:
std::binary_semaphore native_handle;
public:
A_Semaphore(bool v = false) {
native_handle = std::binary_semaphore(v);
}
void acquire() {
native_handle.acquire();
}
void release() {
native_handle.release();
}
~A_Semaphore() { }
};
#endif
#ifdef __AQUERY_ITC_USE_SHMEM__
A_Semaphore prompt{ true }, engine{ false };
#define PROMPT_ACQUIRE() prompt.acquire()
#define PROMPT_RELEASE() prompt.release()
#define ENGINE_ACQUIRE() engine.acquire()
#define ENGINE_RELEASE() engine.release()
#else
#define PROMPT_ACQUIRE()
#define PROMPT_RELEASE() std::this_thread::sleep_for(std::chrono::nanoseconds(0))
#define ENGINE_ACQUIRE()
#define ENGINE_RELEASE()
#endif
#include "aggregations.h"
@ -42,6 +116,13 @@ int test_main();
int n_recv = 0;
char** n_recvd = nullptr;
__AQEXPORT__(void) wait_engine(){
PROMPT_ACQUIRE();
}
__AQEXPORT__(void) wake_engine(){
ENGINE_RELEASE();
}
extern "C" void __DLLEXPORT__ receive_args(int argc, char**argv){
n_recv = argc;
n_recvd = argv;
@ -119,9 +200,10 @@ void initialize_module(const char* module_name, void* module_handle, Context* cx
}
int dll_main(int argc, char** argv, Context* cxt){
aq_timer timer;
Config *cfg = reinterpret_cast<Config *>(argv[0]);
std::unordered_map<std::string, void*> user_module_map;
if (cxt->module_function_maps == 0)
if (cxt->module_function_maps == nullptr)
cxt->module_function_maps = new std::unordered_map<std::string, void*>();
auto module_fn_map =
static_cast<std::unordered_map<std::string, void*>*>(cxt->module_function_maps);
@ -138,16 +220,20 @@ int dll_main(int argc, char** argv, Context* cxt){
cxt->alt_server = NULL;
while(cfg->running){
ENGINE_ACQUIRE();
if (cfg->new_query) {
void *handle = 0;
void *user_module_handle = 0;
cfg->stats.postproc_time = 0;
cfg->stats.monet_time = 0;
void *handle = nullptr;
void *user_module_handle = nullptr;
if (cfg->backend_type == BACKEND_MonetDB){
if (cxt->alt_server == 0)
if (cxt->alt_server == nullptr)
cxt->alt_server = new Server(cxt);
Server* server = reinterpret_cast<Server*>(cxt->alt_server);
if(n_recv > 0){
if (cfg->backend_type == BACKEND_AQuery || cfg->has_dll) {
handle = dlopen("./dll.so", RTLD_LAZY);
handle = dlopen("./dll.so", RTLD_NOW);
}
for (const auto& module : user_module_map){
initialize_module(module.first.c_str(), module.second, cxt);
@ -159,14 +245,18 @@ int dll_main(int argc, char** argv, Context* cxt){
switch(n_recvd[i][0]){
case 'Q': // SQL query for monetdbe
{
timer.reset();
server->exec(n_recvd[i] + 1);
cfg->stats.monet_time += timer.elapsed();
printf("Exec Q%d: %s", i, n_recvd[i]);
}
break;
case 'P': // Postprocessing procedure
if(handle && !server->haserror()) {
code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, n_recvd[i]+1));
timer.reset();
c(cxt);
cfg->stats.postproc_time += timer.elapsed();
}
break;
case 'M': // Load Module
@ -198,7 +288,7 @@ int dll_main(int argc, char** argv, Context* cxt){
auto mname = n_recvd[i] + 1;
auto it = user_module_map.find(mname);
if (user_module_handle == it->second)
user_module_handle = 0;
user_module_handle = nullptr;
dlclose(it->second);
user_module_map.erase(it);
}
@ -207,8 +297,9 @@ int dll_main(int argc, char** argv, Context* cxt){
}
if(handle) {
dlclose(handle);
handle = 0;
handle = nullptr;
}
printf("%ld, %ld", cfg->stats.monet_time, cfg->stats.postproc_time);
cxt->end_session();
n_recv = 0;
}
@ -230,7 +321,9 @@ int dll_main(int argc, char** argv, Context* cxt){
if (handle) dlclose(handle);
cfg->new_query = 0;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
//puts(cfg->running? "true": "false");
asm("");
PROMPT_RELEASE();
}
return 0;

@ -9,6 +9,7 @@
#include <string>
#include <algorithm>
#include <cstdarg>
#include <vector>
#include "io.h"
#include "hasher.h"
@ -139,8 +140,16 @@ public:
ColView<_Ty> operator [](const vector_type<uint32_t>& idxs) const {
return ColView<_Ty>(*this, idxs);
}
void out(uint32_t n = 4, const char* sep = " ") const {
vector_type<_Ty> operator [](const std::vector<bool>& idxs) const {
vector_type<_Ty> ret (this->size);
uint32_t i = 0;
for(const auto& f : idxs){
if(f) ret.emplace_back(this->operator[](i));
++i;
}
return ret;
}
void out(uint32_t n = 1000, const char* sep = " ") const {
const char* more = "";
if (n < this->size)
more = " ... ";
@ -243,7 +252,7 @@ public:
Iterator_t end() const {
return Iterator_t(idxs.end(), orig);
}
void out(uint32_t n = 4, const char* sep = " ") const {
void out(uint32_t n = 1000, const char* sep = " ") const {
n = n > size ? size : n;
std::cout << '(';
for (uint32_t i = 0; i < n; ++i)
@ -438,19 +447,27 @@ struct TableInfo {
}
template <int ...cols>
void print2(const char* __restrict sep = ",", const char* __restrict end = "\n",
const vector_type<uint32_t>* __restrict view = nullptr, FILE* __restrict fp = nullptr) const {
const vector_type<uint32_t>* __restrict view = nullptr,
FILE* __restrict fp = nullptr, uint32_t limit = std::numeric_limits<uint32_t>::max()
) const {
std::string printf_string =
generate_printf_string<typename std::tuple_element<cols, tuple_type>::type ...>(sep, end);
// puts(printf_string.c_str());
std::string header_string = std::string();
constexpr static int a_cols[] = { cols... };
if (fp == nullptr){
header_string = get_header_string(sep, end);
header_string.resize(header_string.size() - strlen(end));
}
else {
for (int i = 0; i < sizeof...(cols); ++i)
header_string += std::string(this->colrefs[a_cols[i]].name) + sep;
const size_t l_sep = strlen(sep);
if (header_string.size() - l_sep >= 0)
header_string.resize(header_string.size() - l_sep);
const auto& prt_loop = [&fp, &view, &printf_string, *this](const auto& f) {
}
const auto& prt_loop = [&fp, &view, &printf_string, *this, &limit](const auto& f) {
#ifdef __AQ__HAS__INT128__
constexpr auto num_hge = count_type<__int128_t, __uint128_t>((tuple_type*)(0));
#else
@ -466,16 +483,21 @@ struct TableInfo {
+ 1 // padding for msvc not allowing empty arrays
];
setgbuf(cbuf);
if (view)
for (uint32_t i = 0; i < view->size; ++i) {
if (view){
uint32_t outsz = limit > view->size ? view->size : limit;
for (uint32_t i = 0; i < outsz; ++i) {
print2_impl<cols...>(f, (*view)[i], printf_string.c_str());
setgbuf();
}
else
for (uint32_t i = 0; i < colrefs[0].size; ++i) {
}
else{
uint32_t outsz = limit > colrefs[0].size ? colrefs[0].size : limit;
for (uint32_t i = 0; i < outsz; ++i) {
print2_impl<cols...>(f, i, printf_string.c_str());
setgbuf();
}
}
};
if (fp)
@ -490,15 +512,17 @@ struct TableInfo {
}
template <int ...vals> struct applier {
inline constexpr static void apply(const TableInfo<Types...>& t, const char* __restrict sep = ",", const char* __restrict end = "\n",
const vector_type<uint32_t>* __restrict view = nullptr, FILE* __restrict fp = nullptr)
const vector_type<uint32_t>* __restrict view = nullptr, FILE* __restrict fp = nullptr, uint32_t limit = std::numeric_limits<uint32_t>::max()
)
{
t.template print2<vals ...>(sep, end, view, fp);
t.template print2<vals ...>(sep, end, view, fp, limit);
}
};
inline void printall(const char* __restrict sep = ",", const char* __restrict end = "\n",
const vector_type<uint32_t>* __restrict view = nullptr, FILE* __restrict fp = nullptr) {
applyIntegerSequence<sizeof...(Types), applier>::apply(*this, sep, end, view, fp);
const vector_type<uint32_t>* __restrict view = nullptr, FILE* __restrict fp = nullptr,
uint32_t limit = std::numeric_limits<uint32_t>::max() ) const {
applyIntegerSequence<sizeof...(Types), applier>::apply(*this, sep, end, view, fp, limit);
}
TableInfo<Types...>* rename(const char* name) {
@ -667,7 +691,9 @@ template <class ...Types>
template <size_t j>
inline typename std::enable_if<j == sizeof...(Types) - 1, void>::type
TableInfo<Types ...>::print_impl(const uint32_t& i, const char* __restrict sep) const {
std::cout << (get<j>(*this))[i];
decltype(auto) t = (get<j>(*this))[i];
// print(t);
std::cout << t;
}
template<class ...Types>
@ -682,6 +708,7 @@ inline typename std::enable_if < j < sizeof...(Types) - 1, void>::type
template<class ...Types>
inline void TableInfo<Types...>::print(const char* __restrict sep, const char* __restrict end) const {
//printall(sep, end);
std::string header_string = get_header_string(sep, end);
std::cout << header_string.c_str();

@ -29,7 +29,15 @@ inline constexpr size_t aq_szof<void> = 0;
template <class T1, class T2>
struct aqis_same_impl {
constexpr static bool value =
std::conditional_t<
std::is_same_v<T1, bool> || std::is_same_v<T2, bool>,
Cond(
(std::is_same_v<T1, bool> && std::is_same_v<T2, bool>),
std::true_type,
std::false_type
),
Cond(
std::is_signed_v<T1> == std::is_signed_v<T2>,
Cond(
std::is_floating_point_v<T1> == std::is_floating_point_v<T2>,
@ -41,15 +49,17 @@ struct aqis_same_impl {
std::false_type
),
std::false_type
)
>::value;
};
// make sure size_t/ptr_t and the corresponding integer types are the same
template <class T1, class T2, class ...Ts>
constexpr bool aqis_same = aqis_same_impl<T1, T2>::value &&
aqis_same<T2, Ts...>;
template <class T1, class T2>
constexpr bool aqis_same<T1, T2> = aqis_same_impl<T1, T2>::value;
namespace types {
enum Type_t {
AINT32, AFLOAT, ASTR, ADOUBLE, ALDOUBLE, AINT64, AINT128, AINT16, ADATE, ATIME, AINT8,

@ -265,7 +265,7 @@ public:
}
size = this->size + dist;
}
inline void out(uint32_t n = 4, const char* sep = " ") const
inline void out(uint32_t n = 4000, const char* sep = " ") const
{
const char* more = "";
if (n < this->size)

@ -41,4 +41,20 @@ void SharedMemory::FreeMemoryMap()
if (this->hFileMap)
CloseHandle(this->hFileMap);
}
#ifndef __USE_STD_SEMAPHORE__
A_Semaphore::A_Semaphore(bool v = false) {
native_handle = CreateSemaphore(NULL, v, 1, NULL);
}
void A_Semaphore::acquire() {
WaitForSingleObject(native_handle, INFINITE);
}
void A_Semaphore::release() {
ReleaseSemaphore(native_handle, 1, NULL);
}
A_Semaphore::~A_Semaphore() {
CloseHandle(native_handle);
}
#endif
#endif

@ -14,5 +14,17 @@ struct SharedMemory
SharedMemory(const char*);
void FreeMemoryMap();
};
#ifndef __USE_STD_SEMAPHORE__
class A_Semaphore {
private:
void* native_handle;
public:
A_Semaphore();
void acquire();
void release();
~A_Semaphore();
};
#endif
#endif

@ -18,7 +18,7 @@
//
///////////////////////////////////////////////////////////////////////////////
#include <stdio.h>
#include "Time.H"
#include "Time.hpp"
Time::Time(char *startTime_)
{

@ -19,7 +19,7 @@ LOAD DATA INFILE "data/test.csv"
INTO TABLE test1
FIELDS TERMINATED BY ","
SELECT pairCorr(c, b) * d, sum(a), b
SELECT pairCorr(c, b) * d, a, sum(b)
FROM test1
group by c,b,d
group by a
order by b ASC

@ -30,10 +30,10 @@ INSERT INTO my_table VALUES(10, 20, "example")
select * from my_table;
INSERT INTO my_table SELECT * FROM my_table
select * from my_table;
SELECT c1, c2 + c2 as twice_c2 FROM my_table;
SELECT c1, c2 as twice_c2 FROM my_table;
CREATE TABLE my_table_derived
AS
SELECT c1, c2 + c2 as twice_c2 FROM my_table;
SELECT c1, c2 as twice_c2 FROM my_table;
SELECT * FROM my_table_derived;

Loading…
Cancel
Save