From 50740fe6acafd60dd26f745efa132239b5726458 Mon Sep 17 00:00:00 2001 From: Bill Date: Fri, 4 Nov 2022 06:10:36 +0800 Subject: [PATCH] Added var(s), stddev(s). New ITC, stats. bugfix on aggregations. --- Makefile | 23 +++--- README.md | 1 + aquery_config.py | 5 +- data/test.csv | 14 +++- datagen.cpp | 2 +- engine/types.py | 7 +- prompt.py | 49 +++++++++--- reconstruct/ast.py | 59 +++++++++++---- reconstruct/expr.py | 2 +- reconstruct/storage.py | 19 +++++ server/aggregations.h | 143 +++++++++++++++++++++++++++++++----- server/libaquery.h | 33 ++++++++- server/server.cpp | 115 ++++++++++++++++++++++++++--- server/table.h | 67 ++++++++++++----- server/types.h | 24 ++++-- server/vector_type.hpp | 2 +- server/winhelper.cpp | 16 ++++ server/winhelper.h | 12 +++ tests/datagen_jose/Time.cpp | 2 +- tests/funcs.a | 4 +- tests/q4.a | 4 +- 21 files changed, 495 insertions(+), 108 deletions(-) diff --git a/Makefile b/Makefile index 1707240..4564f5e 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ OS_SUPPORT = MonetDB_LIB = MonetDB_INC = -Threading = +Defines = CXXFLAGS = --std=c++1z ifeq ($(AQ_DEBUG), 1) OPTFLAGS = -g3 -fsanitize=address -fsanitize=leak @@ -80,16 +80,19 @@ endif ifeq ($(THREADING),1) LIBAQ_SRC += server/threading.cpp LIBAQ_OBJ += threading.o - Threading += -DTHREADING + Defines += -DTHREADING endif +ifeq ($(AQUERY_ITC_USE_SHMEM), 1) + Defines += -D__AQUERY_ITC_USE_SHMEM__ +endif SHAREDFLAGS += $(FPIC) info: $(info $(OPTFLAGS)) $(info $(OS_SUPPORT)) $(info $(OS)) - $(info $(Threading)) + $(info $(Defines)) $(info "test") $(info $(LIBTOOL)) $(info $(MonetDB_INC)) @@ -97,26 +100,26 @@ info: $(info $(CXX)) $(info $(FPIC)) pch: - $(CXX) -x c++-header server/pch.hpp $(FPIC) $(MonetDB_INC) $(OPTFLAGS) $(CXXFLAGS) $(Threading) + $(CXX) -x c++-header server/pch.hpp $(FPIC) $(MonetDB_INC) $(OPTFLAGS) $(CXXFLAGS) $(Defines) libaquery.a: - $(CXX) -c $(FPIC) $(PCHFLAGS) $(LIBAQ_SRC) $(MonetDB_INC) $(MonetDB_LIB) $(OS_SUPPORT) $(Threading) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) &&\ + $(CXX) -c $(FPIC) $(PCHFLAGS) $(LIBAQ_SRC) $(MonetDB_INC) $(MonetDB_LIB) $(OS_SUPPORT) $(Defines) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) &&\ $(LIBTOOL) libaquery.a $(LIBAQ_OBJ) &&\ $(RANLIB) libaquery.a server.bin: - $(CXX) $(LIBAQ_SRC) $(LINKFLAGS) $(OS_SUPPORT) $(Threading) $(MonetDB_INC) $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o server.bin + $(CXX) $(LIBAQ_SRC) $(LINKFLAGS) $(OS_SUPPORT) $(Defines) $(MonetDB_INC) $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o server.bin launcher: - $(CXX) -D__AQ_BUILD_LAUNCHER__ $(LIBAQ_SRC) $(LINKFLAGS) $(OS_SUPPORT) $(Threading) $(MonetDB_INC) $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o aq + $(CXX) -D__AQ_BUILD_LAUNCHER__ $(LIBAQ_SRC) $(LINKFLAGS) $(OS_SUPPORT) $(Defines) $(MonetDB_INC) $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o aq server.so: # $(CXX) -z muldefs server/server.cpp server/monetdb_conn.cpp -fPIC -shared $(OS_SUPPORT) monetdb/msvc/monetdbe.dll --std=c++1z -O3 -march=native -o server.so -I./monetdb/msvc - $(CXX) $(SHAREDFLAGS) $(PCHFLAGS) $(LIBAQ_SRC) $(OS_SUPPORT) $(Threading) $(MonetDB_INC) $(MonetDB_LIB) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o server.so + $(CXX) $(SHAREDFLAGS) $(PCHFLAGS) $(LIBAQ_SRC) $(OS_SUPPORT) $(Defines) $(MonetDB_INC) $(MonetDB_LIB) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o server.so server_uselib: $(CXX) $(SHAREDFLAGS) $(USELIB_FLAG),libaquery.a $(MonetDB_LIB) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o server.so snippet: - $(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp $(LIBAQ_SRC) $(MonetDB_INC) $(MonetDB_LIB) $(Threading) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o dll.so + $(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp $(LIBAQ_SRC) $(MonetDB_INC) $(MonetDB_LIB) $(Defines) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o dll.so snippet_uselib: - $(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp libaquery.a $(MonetDB_INC) $(Threading) $(MonetDB_LIB) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o dll.so + $(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp libaquery.a $(MonetDB_INC) $(Defines) $(MonetDB_LIB) $(OPTFLAGS) $(LINKFLAGS) $(CXXFLAGS) -o dll.so docker: docker build -t aquery . diff --git a/README.md b/README.md index fc06b4f..1de828d 100644 --- a/README.md +++ b/README.md @@ -225,6 +225,7 @@ DROP TABLE my_table IF EXISTS ## Built-in functions: - `avg[s]`: average of a column. `avgs(col), avgs(w, col)` is rolling and moving average with window `w` of the column `col`. +- `var[s]`, `stddev[s]`: [moving/rolling] **population** variance, standard deviation. - `sum[s]`, `max[s]`, `min[s]`: similar to `avg[s]` - `ratios(w = 1, col)`: moving ratio of a column, e.g. `ratios(w, col)[i]=col[i-w]/col[i]`. Window `w` has default value of 1. - `next(col), prev(col)`: moving column back and forth by 1, e.g. `next(col)[i] = col[i+1]`. diff --git a/aquery_config.py b/aquery_config.py index 2d5939b..e3600f9 100644 --- a/aquery_config.py +++ b/aquery_config.py @@ -2,7 +2,7 @@ ## GLOBAL CONFIGURATION FLAGS -version_string = '0.5.0a' +version_string = '0.5.1a' add_path_to_ldpath = True rebuild_backend = False run_backend = True @@ -21,7 +21,8 @@ def init_config(): import os from engine.utils import add_dll_dir # os.environ['CXX'] = 'C:/Program Files/LLVM/bin/clang.exe' - # os.environ['THREADING'] = '1' + os.environ['THREADING'] = '1' + os.environ['AQUERY_ITC_USE_SHMEM'] = '1' if ('__config_initialized__' not in globals() or not __config_initialized__): diff --git a/data/test.csv b/data/test.csv index 5eb9e8f..b4fe244 100644 --- a/data/test.csv +++ b/data/test.csv @@ -1,11 +1,21 @@ a, b, c, d 1,1,2,2 +2,1,2,2 +2,4,3,4 1,2,2,2 1,2,3,4 4,2,1,4 -2,1,3,4 +2,1,3,3 +2,1,1,2 1,2,3,4 +3,2,4,2 1,2,3,3 3,2,1,2 -2,1,2,2 +2,1,4,2 +3,3,4,4 +2,2,3,1 +2,3,4,4 +2,4,1,2 +3,4,1,2 +2,3,2,2 1,2,3,1 diff --git a/datagen.cpp b/datagen.cpp index 88f5a48..c96b480 100644 --- a/datagen.cpp +++ b/datagen.cpp @@ -151,5 +151,5 @@ int gen_stock_data(int argc, char* argv[]){ } int main(int argc, char* argv[]){ - gen_stock_data(argc, argv); + return gen_stock_data(argc, argv); } diff --git a/engine/types.py b/engine/types.py index 3e217a3..5baf47f 100644 --- a/engine/types.py +++ b/engine/types.py @@ -324,10 +324,14 @@ fnfirst = OperatorBase('first', 1, as_is, cname = 'frist', sqlname = 'FRIST', ca #fnavg = OperatorBase('avg', 1, fp(ext(auto_extension)), cname = 'avg', sqlname = 'AVG', call = fn_behavior) fnsum = OperatorBase('sum', 1, long_return, cname = 'sum', sqlname = 'SUM', call = fn_behavior) fnavg = OperatorBase('avg', 1, lfp_return, cname = 'avg', sqlname = 'AVG', call = fn_behavior) +fnvar = OperatorBase('var', 1, lfp_return, cname = 'var', sqlname = 'VAR_POP', call = fn_behavior) +fnstd = OperatorBase('stddev', 1, lfp_return, cname = 'stddev', sqlname = 'STDDEV_POP', call = fn_behavior) fnmaxs = OperatorBase('maxs', [1, 2], ty_clamp(as_is, -1), cname = 'maxs', sqlname = 'MAXS', call = windowed_fn_behavor) fnmins = OperatorBase('mins', [1, 2], ty_clamp(as_is, -1), cname = 'mins', sqlname = 'MINS', call = windowed_fn_behavor) fnsums = OperatorBase('sums', [1, 2], ext(ty_clamp(auto_extension, -1)), cname = 'sums', sqlname = 'SUMS', call = windowed_fn_behavor) fnavgs = OperatorBase('avgs', [1, 2], fp(ext(ty_clamp(auto_extension, -1))), cname = 'avgs', sqlname = 'AVGS', call = windowed_fn_behavor) +fnvars = OperatorBase('vars', [1, 2], fp(ext(ty_clamp(auto_extension, -1))), cname = 'vars', sqlname = 'VARS', call = windowed_fn_behavor) +fnstds = OperatorBase('stddevs', [1, 2], fp(ext(ty_clamp(auto_extension, -1))), cname = 'stddevs', sqlname = 'STDDEVS', call = windowed_fn_behavor) fncnt = OperatorBase('count', 1, int_return, cname = 'count', sqlname = 'COUNT', call = count_behavior) fnpack = OperatorBase('pack', -1, pack_return, cname = 'pack', sqlname = 'PACK', call = pack_behavior) # special @@ -361,7 +365,8 @@ builtin_cstdlib = _op_make_dict(fnsqrt, fnlog, fnsin, fncos, fntan, fnpow) builtin_func = _op_make_dict(fnmax, fnmin, fnsum, fnavg, fnmaxs, fnmins, fndeltas, fnratios, fnlast, fnfirst, fnsums, fnavgs, fncnt, - fnpack, fntrunc, fnprev, fnnext) + fnpack, fntrunc, fnprev, fnnext, + fnvar, fnvars, fnstd, fnstds) user_module_func = {} builtin_operators : Dict[str, OperatorBase] = {**builtin_binary_arith, **builtin_binary_logical, **builtin_unary_arith, **builtin_unary_logical, **builtin_unary_special, **builtin_func, **builtin_cstdlib, diff --git a/prompt.py b/prompt.py index cc6cb15..f257b49 100644 --- a/prompt.py +++ b/prompt.py @@ -160,9 +160,11 @@ class QueryStats: class Config: __all_attrs__ = ['running', 'new_query', 'server_mode', 'backend_type', 'has_dll', - 'postproc_time', 'sql_time', - 'n_buffers' + 'n_buffers', ] + __i64_attrs__ = [ + 'monetdb_time', 'postproc_time' + ] __init_attributes__ = False @staticmethod @@ -171,26 +173,42 @@ class Config: from functools import partial for _i, attr in enumerate(Config.__all_attrs__): if not hasattr(Config, attr): - setattr(Config, attr, property(partial(Config.getter, i = _i), partial(Config.setter, i = _i))) + setattr(Config, attr, property( + partial(Config.getter, i = _i), partial(Config.setter, i = _i) + )) + for _i, attr in enumerate(Config.__i64_attrs__): + if not hasattr(Config, attr): + setattr(Config, attr, property( + partial(Config.i64_getter, i = _i), partial(Config.i64_setter, i = _i) + )) Config.__init_attributes__ = True def __init__(self, mode, nq = 0, n_bufs = 0, bf_szs = []) -> None: Config.__init_self__() - self.int_size = 4 self.n_attrib = len(Config.__all_attrs__) - self.buf = bytearray((self.n_attrib + n_bufs) * self.int_size) - self.np_buf = np.ndarray(shape=(self.n_attrib), buffer=self.buf, dtype=np.int32) + self.buf = bytearray((self.n_attrib + n_bufs) * 4 + + len(self.__i64_attrs__) * 8 + ) + self.np_buf = np.ndarray(shape = (self.n_attrib), buffer = self.buf, dtype = np.int32) + self.np_i64buf = np.ndarray(shape = len(self.__i64_attrs__), buffer = self.buf, + dtype = np.int64, offset = 4 * len(self.__all_attrs__)) self.new_query = nq self.server_mode = mode.value self.running = 1 self.backend_type = Backend_Type.BACKEND_AQuery.value self.has_dll = 0 self.n_buffers = n_bufs + self.monetdb_time = 0 + self.postproc_time = 0 def getter (self, *, i): return self.np_buf[i] def setter(self, v, *, i): self.np_buf[i] = v + def i64_getter (self, *, i): + return self.np_i64buf[i] + def i64_setter(self, v, *, i): + self.np_i64buf[i] = v def set_bufszs(self, buf_szs): for i in range(min(len(buf_szs), self.n_buffers)): @@ -209,6 +227,8 @@ class PromptState(): test_parser = True server_mode: RunType = RunType.Threaded server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so' + wait_engine = lambda: None + wake_engine = lambda: None set_ready = lambda: None get_ready = lambda: None server_status = lambda: False @@ -299,12 +319,14 @@ def init_threaded(state : PromptState): if aquery_config.run_backend: server_so = ctypes.CDLL('./'+state.server_bin) state.send = server_so['receive_args'] + state.wait_engine = server_so['wait_engine'] + state.wake_engine = server_so['wake_engine'] aquery_config.have_hge = server_so['have_hge']() if aquery_config.have_hge != 0: from engine.types import get_int128_support get_int128_support() state.th = threading.Thread(target=server_so['main'], args=(-1, ctypes.POINTER(ctypes.c_char_p)(state.cfg.c)), daemon=True) - state.th.start() + state.th.start() def init_prompt() -> PromptState: aquery_config.init_config() @@ -337,6 +359,8 @@ def init_prompt() -> PromptState: rm = lambda: None def __set_ready(): state.cfg.new_query = 1 + state.wake_engine() + state.set_ready = __set_ready state.get_ready = lambda: aquery_config.run_backend and state.cfg.new_query if aquery_config.run_backend: @@ -380,9 +404,16 @@ def prompt(running = lambda:True, next = lambda:input('> '), state = None): while running(): try: if state.server_status(): - state.init() + state.init(state) + # *** busy waiting *** + # while state.get_ready(): + # time.sleep(.00001) while state.get_ready(): - time.sleep(.00001) + state.wait_engine() + if state.need_print: + print(f'MonetDB Time: {state.cfg.monetdb_time/10**9}, ' + f'PostProc Time: {state.cfg.postproc_time/10**9}') + state.cfg.monetdb_time = state.cfg.postproc_time = 0 state.currstats.print(state.stats, need_print=state.need_print) try: og_q : str = next() diff --git a/reconstruct/ast.py b/reconstruct/ast.py index 66342df..c95223d 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -54,19 +54,28 @@ class ast_node: self.context.sql_end() from reconstruct.expr import expr, fastscan - - +class SubqType(Enum): + WITH = auto() + FROM = auto() + PROJECTION = auto() + FILTER = auto() + GROUPBY = auto() + ORDERBY = auto() + NONE = auto() class projection(ast_node): name = 'projection' first_order = 'select' - + + def __init__(self, parent : Optional["ast_node"], node, context : Optional[Context] = None, - force_use_spgb : bool = False + force_use_spgb : bool = False, + subq_type: SubqType = SubqType.NONE ): self.force_use_spgb = force_use_spgb + self.subq_type = subq_type super().__init__(parent, node, context) def init(self, _): @@ -83,9 +92,21 @@ class projection(ast_node): p = node['select_distinct'] self.distinct = True if 'with' in node: - self.with_clause = projection(self, node['value']) + with_table = node['with']['name'] + with_table_name = tuple(with_table.keys())[0] + with_table_cols = tuple(with_table.values())[0] + self.with_clause = projection(self, node['with']['value'], subq_type=SubqType.WITH) + self.with_clause.out_table.add_alias(with_table_name) + for new_name, col in zip(with_table_cols, self.with_clause.out_table.columns): + col.rename(new_name) + self.with_clause.out_table.contextname_cpp + # in monetdb, in cxt else: self.with_clause = None + + self.limit = None + if 'limit' in node: + self.limit = node['limit'] self.projections = p if type(p) is list else [p] if self.parent is None: @@ -115,8 +136,9 @@ class projection(ast_node): if type(self.datasource) is join: self.datasource.process_join_conditions() + self.context.special_gb = self.force_use_spgb if 'groupby' in node: # if groupby clause contains special stuff - self.context.special_gb = groupby.check_special(self, node['groupby']) + self.context.special_gb |= groupby.check_special(self, node['groupby']) def consume(self, node): # deal with projections @@ -230,7 +252,7 @@ class projection(ast_node): self.group_node = groupby(self, node['groupby']) if self.group_node.terminate: self.context.abandon_query() - projection(self.parent, node, self.context, True) + projection(self.parent, node, self.context, True, subq_type=self.subq_type) return if self.group_node.use_sp_gb: self.has_postproc = True @@ -370,8 +392,12 @@ class projection(ast_node): # for funcs evaluate f_i(x, ...) self.context.emitc(f'{self.out_table.contextname_cpp}->get_col<{key}>().initfrom({val[1]}, "{cols[i].name}");') # print out col_is - if 'into' not in node: - self.context.emitc(f'print(*{self.out_table.contextname_cpp});') + + if 'into' not in node and self.subq_type == SubqType.NONE: + if self.limit is None: + self.context.emitc(f'print(*{self.out_table.contextname_cpp});') + else: + self.context.emitc(f'{self.out_table.contextname_cpp}->printall(" ","\\n", nullptr, nullptr, {self.limit});') if self.outfile: self.outfile.finalize() @@ -627,7 +653,7 @@ class groupby(ast_node): self.gb_cols = set() # dedicated_glist -> cols populated for special group by self.dedicated_glist : List[Tuple[expr, Set[ColRef]]] = [] - self.use_sp_gb = False + self.use_sp_gb = self.parent.force_use_spgb for g in node: self.datasource.rec = set() g_expr = expr(self, g['value']) @@ -654,12 +680,13 @@ class groupby(ast_node): if (projs[2].is_compound and not ((projs[2].is_ColExpr and projs[2].raw_col in self.gb_cols) or projs[2].sql in self.gb_cols) - ): - if self.parent.force_use_spgb: + ) and (not self.parent.force_use_spgb): self.use_sp_gb = True - else: - self.terminate = True - return + break + + if self.use_sp_gb and not self.parent.force_use_spgb: + self.terminate = True + return if not self.use_sp_gb: self.dedicated_gb = None self.add(', '.join(o_list)) @@ -1144,7 +1171,7 @@ class load(ast_node): self.context.emitc(f'{c.cxt_name}.emplace_back({col_tmp_names[i]});') self.context.emitc('}') - self.context.emitc(f'print(*{self.out_table});') + # self.context.emitc(f'print(*{self.out_table});') self.context.emitc(f'{self.out_table}->monetdb_append_table(cxt->alt_server, "{table.table_name}");') self.context.postproc_end(self.postproc_fname) diff --git a/reconstruct/expr.py b/reconstruct/expr.py index bfd552c..e22a4ed 100644 --- a/reconstruct/expr.py +++ b/reconstruct/expr.py @@ -168,7 +168,7 @@ class expr(ast_node): special_func = [*self.context.udf_map.keys(), *self.context.module_map.keys(), "maxs", "mins", "avgs", "sums", "deltas", "last", "first", - "ratios", "pack", "truncate"] + "stddevs", "vars", "ratios", "pack", "truncate"] if ( self.context.special_gb diff --git a/reconstruct/storage.py b/reconstruct/storage.py index 47eab9a..98ad799 100644 --- a/reconstruct/storage.py +++ b/reconstruct/storage.py @@ -45,6 +45,14 @@ class ColRef: alias = table_name return f'{alias}.{self.get_name()}' + def rename(self, name): + self.alias.discard(self.name) + self.table.columns_byname.pop(self.name, None) + self.name = name + self.table.columns_byname[name] = self + + return self + def __getitem__(self, key): if type(key) is str: return getattr(self, key) @@ -97,6 +105,17 @@ class TableInfo: return self.cxt.tables_byname[alias] = self self.alias.add(alias) + + def rename(self, name): + if name in self.cxt.tables_byname.keys(): + print(f"Error: table name {name} already exists") + return + + self.cxt.tables_byname.pop(self.table_name, None) + self.alias.discard(self.table_name) + self.table_name = name + self.cxt.tables_byname[name] = self + self.alias.add(name) def parse_col_names(self, colExpr) -> ColRef: parsedColExpr = colExpr.split('.') diff --git a/server/aggregations.h b/server/aggregations.h index 5338e23..cb4bcbe 100644 --- a/server/aggregations.h +++ b/server/aggregations.h @@ -202,6 +202,102 @@ decayed_t>> avgw(uint32_t w, const VT return ret; } +template class VT, bool sd = false> +decayed_t>> varw(uint32_t w, const VT& arr) { + using FPType = types::GetFPType>; + const uint32_t& len = arr.size; + decayed_t ret(len); + uint32_t i = 0; + types::GetLongType s{}; + w = w > len ? len : w; + FPType EnX {}, MnX{}; + if (len) { + s = arr[0]; + MnX = 0; + EnX = arr[0]; + ret[i++] = 0; + } + for (; i < len; ++i){ + s += arr[i]; + FPType _EnX = s / (FPType)(i + 1); + MnX += (arr[i] - EnX) * (arr[i] - _EnX); + EnX = _EnX; + ret[i] = MnX / (FPType)(i + 1); + if constexpr(sd) ret[i-1] = sqrt(ret[i-1]); + } + const float rw = 1.f / (float)w; + s *= rw; + for (; i < len; ++i){ + const auto dw = arr[i] - arr[i - w - 1]; + const auto sw = arr[i] + arr[i - w - 1]; + const auto dex = dw * rw; + ret[i] = ret[i-1] - dex*(s + s + dex - sw); + if constexpr(sd) ret[i-1] = sqrt(ret[i-1]); + s += dex; + } + if constexpr(sd) + if(i) + ret[i-1] = sqrt(ret[i-1]); + + return ret; +} + +template class VT> +types::GetFPType>> var(const VT& arr) { + typedef types::GetFPType>> FPType; + const uint32_t& len = arr.size; + uint32_t i = 0; + types::GetLongType s{0}; + types::GetLongType ssq{0}; + if (len) { + s = arr[0]; + ssq = arr[0] * arr[0]; + } + for (; i < len; ++i){ + s += arr[i]; + ssq += arr[i] * arr[i]; + } + return (ssq - s * s / (FPType)(len + 1)) / (FPType)(len + 1); +} + +template class VT, bool sd = false> +decayed_t>> vars(const VT& arr) { + typedef types::GetFPType> FPType; + const uint32_t& len = arr.size; + decayed_t ret(len); + uint32_t i = 0; + types::GetLongType s{}; + FPType MnX{}; + FPType EnX {}; + if (len) { + s = arr[0]; + MnX = 0; + EnX = arr[0]; + ret[i++] = 0; + } + for (; i < len; ++i){ + s += arr[i]; + FPType _EnX = s / (FPType)(i + 1); + MnX += (arr[i] - EnX) * (arr[i] - _EnX); + printf("%d %ld ", arr[i], MnX); + EnX = _EnX; + ret[i] = MnX / (FPType)(i + 1); + if constexpr(sd) ret[i] = sqrt(ret[i]); + } + return ret; +} +template class VT> +types::GetFPType>> stddev(const VT& arr) { + return sqrt(var(arr)); +} +template class VT> +decayed_t>> stddevs(const VT& arr) { + return vars(arr); +} +template class VT> +decayed_t>> stddevw(uint32_t w, const VT& arr) { + return varw(w, arr); +} // use getSignedType template class VT> decayed_t deltas(const VT& arr) { @@ -251,26 +347,33 @@ T first(const VT& arr) { } + #define __DEFAULT_AGGREGATE_FUNCTION__(NAME, RET) \ -template constexpr inline T NAME(const T& v) { return RET; } +template constexpr T NAME(const T& v) { return RET; } // non-aggreation count. E.g. SELECT COUNT(col) from table; -template constexpr inline T count(const T& v) { return 1; } -template constexpr inline T max(const T& v) { return v; } -template constexpr inline T min(const T& v) { return v; } -template constexpr inline T avg(const T& v) { return v; } -template constexpr inline T sum(const T& v) { return v; } -template constexpr inline T maxw(uint32_t, const T& v) { return v; } -template constexpr inline T minw(uint32_t, const T& v) { return v; } -template constexpr inline T avgw(uint32_t, const T& v) { return v; } -template constexpr inline T sumw(uint32_t, const T& v) { return v; } -template constexpr inline T ratiow(uint32_t, const T& v) { return 1; } -template constexpr inline T maxs(const T& v) { return v; } -template constexpr inline T mins(const T& v) { return v; } -template constexpr inline T avgs(const T& v) { return v; } -template constexpr inline T sums(const T& v) { return v; } -template constexpr inline T last(const T& v) { return v; } -template constexpr inline T prev(const T& v) { return v; } -template constexpr inline T aggnext(const T& v) { return v; } -template constexpr inline T daltas(const T& v) { return 0; } -template constexpr inline T ratios(const T& v) { return 1; } +template constexpr T count(const T&) { return 1; } +template constexpr T var(const T&) { return 0; } +template constexpr T vars(const T&) { return 0; } +template constexpr T varw(uint32_t, const T&) { return 0; } +template constexpr T stddev(const T&) { return 0; } +template constexpr T stddevs(const T&) { return 0; } +template constexpr T stddevw(uint32_t, const T&) { return 0; } +template constexpr T max(const T& v) { return v; } +template constexpr T min(const T& v) { return v; } +template constexpr T avg(const T& v) { return v; } +template constexpr T sum(const T& v) { return v; } +template constexpr T maxw(uint32_t, const T& v) { return v; } +template constexpr T minw(uint32_t, const T& v) { return v; } +template constexpr T avgw(uint32_t, const T& v) { return v; } +template constexpr T sumw(uint32_t, const T& v) { return v; } +template constexpr T ratiow(uint32_t, const T&) { return 1; } +template constexpr T maxs(const T& v) { return v; } +template constexpr T mins(const T& v) { return v; } +template constexpr T avgs(const T& v) { return v; } +template constexpr T sums(const T& v) { return v; } +template constexpr T last(const T& v) { return v; } +template constexpr T prev(const T& v) { return v; } +template constexpr T aggnext(const T& v) { return v; } +template constexpr T daltas(const T&) { return 0; } +template constexpr T ratios(const T&) { return 1; } diff --git a/server/libaquery.h b/server/libaquery.h index 551d205..6227af9 100644 --- a/server/libaquery.h +++ b/server/libaquery.h @@ -3,6 +3,7 @@ #include "table.h" #include +#include enum Log_level { LOG_INFO, @@ -15,9 +16,16 @@ enum Backend_Type { BACKEND_MonetDB, BACKEND_MariaDB }; + +struct QueryStats{ + long long monet_time; + long long postproc_time; +}; struct Config{ - int running, new_query, server_mode, - backend_type, has_dll, exec_time, n_buffers; + int running, new_query, server_mode, + backend_type, has_dll, + n_buffers; + QueryStats stats; int buffer_sizes[]; }; @@ -67,6 +75,27 @@ struct Context{ std::unordered_map cols; }; +class aq_timer { +private: + std::chrono::high_resolution_clock::time_point now; +public: + aq_timer(){ + now = std::chrono::high_resolution_clock::now(); + } + void reset(){ + now = std::chrono::high_resolution_clock::now(); + } + long long elapsed(){ + long long ret = (std::chrono::high_resolution_clock::now() - now).count(); + reset(); + return ret; + } + long long lap() const{ + long long ret = (std::chrono::high_resolution_clock::now() - now).count(); + return ret; + } +}; + #ifdef _WIN32 #define __DLLEXPORT__ __declspec(dllexport) __stdcall #else diff --git a/server/server.cpp b/server/server.cpp index 2105545..6a9bc43 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -18,6 +18,7 @@ #include struct SharedMemory { + std::atomic a; int hFileMap; void* pData; SharedMemory(const char* fname) { @@ -31,6 +32,79 @@ struct SharedMemory } }; +#ifndef __USE_STD_SEMAPHORE__ +#ifdef __APPLE__ +#include +class A_Semaphore { +private: + dispatch_semaphore_t native_handle; +public: + A_Semaphore(bool v = false) { + native_handle = dispatch_semaphore_create(v); + } + void acquire() { + puts("acquire"); + dispatch_semaphore_wait(native_handle, DISPATCH_TIME_FOREVER); + } + void release() { + puts("release"); + dispatch_semaphore_signal(native_handle); + } + ~A_Semaphore() { + } +}; +#else +#include +class A_Semaphore { +private: + sem_t native_handle; +public: + A_Semaphore(bool v = false) { + sem_init(&native_handle, v, 1); + } + void acquire() { + sem_wait(&native_handle); + } + void release() { + sem_post(&native_handle); + } + ~A_Semaphore() { + sem_destroy(&native_handle); + } +}; +#endif +#endif + +#endif +#ifdef __USE_STD_SEMAPHORE__ +#include +class A_Semaphore { +private: + std::binary_semaphore native_handle; +public: + A_Semaphore(bool v = false) { + native_handle = std::binary_semaphore(v); + } + void acquire() { + native_handle.acquire(); + } + void release() { + native_handle.release(); + } + ~A_Semaphore() { } +}; +#endif +#ifdef __AQUERY_ITC_USE_SHMEM__ +A_Semaphore prompt{ true }, engine{ false }; +#define PROMPT_ACQUIRE() prompt.acquire() +#define PROMPT_RELEASE() prompt.release() +#define ENGINE_ACQUIRE() engine.acquire() +#define ENGINE_RELEASE() engine.release() +#else +#define PROMPT_ACQUIRE() +#define PROMPT_RELEASE() std::this_thread::sleep_for(std::chrono::nanoseconds(0)) +#define ENGINE_ACQUIRE() +#define ENGINE_RELEASE() #endif #include "aggregations.h" @@ -42,6 +116,13 @@ int test_main(); int n_recv = 0; char** n_recvd = nullptr; +__AQEXPORT__(void) wait_engine(){ + PROMPT_ACQUIRE(); +} +__AQEXPORT__(void) wake_engine(){ + ENGINE_RELEASE(); +} + extern "C" void __DLLEXPORT__ receive_args(int argc, char**argv){ n_recv = argc; n_recvd = argv; @@ -119,15 +200,16 @@ void initialize_module(const char* module_name, void* module_handle, Context* cx } int dll_main(int argc, char** argv, Context* cxt){ + aq_timer timer; Config *cfg = reinterpret_cast(argv[0]); std::unordered_map user_module_map; - if (cxt->module_function_maps == 0) + if (cxt->module_function_maps == nullptr) cxt->module_function_maps = new std::unordered_map(); auto module_fn_map = static_cast*>(cxt->module_function_maps); auto buf_szs = cfg->buffer_sizes; - void** buffers = (void**)malloc(sizeof(void*) * cfg->n_buffers); + void** buffers = (void**) malloc (sizeof(void*) * cfg->n_buffers); for (int i = 0; i < cfg->n_buffers; i++) buffers[i] = static_cast(argv[i + 1]); @@ -136,18 +218,22 @@ int dll_main(int argc, char** argv, Context* cxt){ cxt->n_buffers = cfg->n_buffers; cxt->sz_bufs = buf_szs; cxt->alt_server = NULL; - + while(cfg->running){ + ENGINE_ACQUIRE(); if (cfg->new_query) { - void *handle = 0; - void *user_module_handle = 0; + cfg->stats.postproc_time = 0; + cfg->stats.monet_time = 0; + + void *handle = nullptr; + void *user_module_handle = nullptr; if (cfg->backend_type == BACKEND_MonetDB){ - if (cxt->alt_server == 0) + if (cxt->alt_server == nullptr) cxt->alt_server = new Server(cxt); Server* server = reinterpret_cast(cxt->alt_server); if(n_recv > 0){ if (cfg->backend_type == BACKEND_AQuery || cfg->has_dll) { - handle = dlopen("./dll.so", RTLD_LAZY); + handle = dlopen("./dll.so", RTLD_NOW); } for (const auto& module : user_module_map){ initialize_module(module.first.c_str(), module.second, cxt); @@ -159,14 +245,18 @@ int dll_main(int argc, char** argv, Context* cxt){ switch(n_recvd[i][0]){ case 'Q': // SQL query for monetdbe { + timer.reset(); server->exec(n_recvd[i] + 1); + cfg->stats.monet_time += timer.elapsed(); printf("Exec Q%d: %s", i, n_recvd[i]); } break; case 'P': // Postprocessing procedure if(handle && !server->haserror()) { code_snippet c = reinterpret_cast(dlsym(handle, n_recvd[i]+1)); + timer.reset(); c(cxt); + cfg->stats.postproc_time += timer.elapsed(); } break; case 'M': // Load Module @@ -198,7 +288,7 @@ int dll_main(int argc, char** argv, Context* cxt){ auto mname = n_recvd[i] + 1; auto it = user_module_map.find(mname); if (user_module_handle == it->second) - user_module_handle = 0; + user_module_handle = nullptr; dlclose(it->second); user_module_map.erase(it); } @@ -207,8 +297,9 @@ int dll_main(int argc, char** argv, Context* cxt){ } if(handle) { dlclose(handle); - handle = 0; + handle = nullptr; } + printf("%ld, %ld", cfg->stats.monet_time, cfg->stats.postproc_time); cxt->end_session(); n_recv = 0; } @@ -230,9 +321,11 @@ int dll_main(int argc, char** argv, Context* cxt){ if (handle) dlclose(handle); cfg->new_query = 0; } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + //puts(cfg->running? "true": "false"); + asm(""); + PROMPT_RELEASE(); } - + return 0; } diff --git a/server/table.h b/server/table.h index da75cc1..20e2449 100644 --- a/server/table.h +++ b/server/table.h @@ -9,6 +9,7 @@ #include #include #include +#include #include "io.h" #include "hasher.h" @@ -139,8 +140,16 @@ public: ColView<_Ty> operator [](const vector_type& idxs) const { return ColView<_Ty>(*this, idxs); } - - void out(uint32_t n = 4, const char* sep = " ") const { + vector_type<_Ty> operator [](const std::vector& idxs) const { + vector_type<_Ty> ret (this->size); + uint32_t i = 0; + for(const auto& f : idxs){ + if(f) ret.emplace_back(this->operator[](i)); + ++i; + } + return ret; + } + void out(uint32_t n = 1000, const char* sep = " ") const { const char* more = ""; if (n < this->size) more = " ... "; @@ -243,7 +252,7 @@ public: Iterator_t end() const { return Iterator_t(idxs.end(), orig); } - void out(uint32_t n = 4, const char* sep = " ") const { + void out(uint32_t n = 1000, const char* sep = " ") const { n = n > size ? size : n; std::cout << '('; for (uint32_t i = 0; i < n; ++i) @@ -438,19 +447,27 @@ struct TableInfo { } template void print2(const char* __restrict sep = ",", const char* __restrict end = "\n", - const vector_type* __restrict view = nullptr, FILE* __restrict fp = nullptr) const { + const vector_type* __restrict view = nullptr, + FILE* __restrict fp = nullptr, uint32_t limit = std::numeric_limits::max() + ) const { std::string printf_string = generate_printf_string::type ...>(sep, end); + // puts(printf_string.c_str()); std::string header_string = std::string(); constexpr static int a_cols[] = { cols... }; - for (int i = 0; i < sizeof...(cols); ++i) - header_string += std::string(this->colrefs[a_cols[i]].name) + sep; - const size_t l_sep = strlen(sep); - if (header_string.size() - l_sep >= 0) - header_string.resize(header_string.size() - l_sep); - - const auto& prt_loop = [&fp, &view, &printf_string, *this](const auto& f) { + if (fp == nullptr){ + header_string = get_header_string(sep, end); + header_string.resize(header_string.size() - strlen(end)); + } + else { + for (int i = 0; i < sizeof...(cols); ++i) + header_string += std::string(this->colrefs[a_cols[i]].name) + sep; + const size_t l_sep = strlen(sep); + if (header_string.size() - l_sep >= 0) + header_string.resize(header_string.size() - l_sep); + } + const auto& prt_loop = [&fp, &view, &printf_string, *this, &limit](const auto& f) { #ifdef __AQ__HAS__INT128__ constexpr auto num_hge = count_type<__int128_t, __uint128_t>((tuple_type*)(0)); #else @@ -466,16 +483,21 @@ struct TableInfo { + 1 // padding for msvc not allowing empty arrays ]; setgbuf(cbuf); - if (view) - for (uint32_t i = 0; i < view->size; ++i) { + + if (view){ + uint32_t outsz = limit > view->size ? view->size : limit; + for (uint32_t i = 0; i < outsz; ++i) { print2_impl(f, (*view)[i], printf_string.c_str()); setgbuf(); } - else - for (uint32_t i = 0; i < colrefs[0].size; ++i) { + } + else{ + uint32_t outsz = limit > colrefs[0].size ? colrefs[0].size : limit; + for (uint32_t i = 0; i < outsz; ++i) { print2_impl(f, i, printf_string.c_str()); setgbuf(); } + } }; if (fp) @@ -490,15 +512,17 @@ struct TableInfo { } template struct applier { inline constexpr static void apply(const TableInfo& t, const char* __restrict sep = ",", const char* __restrict end = "\n", - const vector_type* __restrict view = nullptr, FILE* __restrict fp = nullptr) + const vector_type* __restrict view = nullptr, FILE* __restrict fp = nullptr, uint32_t limit = std::numeric_limits::max() + ) { - t.template print2(sep, end, view, fp); + t.template print2(sep, end, view, fp, limit); } }; inline void printall(const char* __restrict sep = ",", const char* __restrict end = "\n", - const vector_type* __restrict view = nullptr, FILE* __restrict fp = nullptr) { - applyIntegerSequence::apply(*this, sep, end, view, fp); + const vector_type* __restrict view = nullptr, FILE* __restrict fp = nullptr, + uint32_t limit = std::numeric_limits::max() ) const { + applyIntegerSequence::apply(*this, sep, end, view, fp, limit); } TableInfo* rename(const char* name) { @@ -667,7 +691,9 @@ template template inline typename std::enable_if::type TableInfo::print_impl(const uint32_t& i, const char* __restrict sep) const { - std::cout << (get(*this))[i]; + decltype(auto) t = (get(*this))[i]; +// print(t); + std::cout << t; } template @@ -682,6 +708,7 @@ inline typename std::enable_if < j < sizeof...(Types) - 1, void>::type template inline void TableInfo::print(const char* __restrict sep, const char* __restrict end) const { + //printall(sep, end); std::string header_string = get_header_string(sep, end); std::cout << header_string.c_str(); diff --git a/server/types.h b/server/types.h index 3ae14b6..20cb0fc 100644 --- a/server/types.h +++ b/server/types.h @@ -29,27 +29,37 @@ inline constexpr size_t aq_szof = 0; template struct aqis_same_impl { constexpr static bool value = + std::conditional_t< - std::is_signed_v == std::is_signed_v, + std::is_same_v || std::is_same_v, + Cond( + (std::is_same_v && std::is_same_v), + std::true_type, + std::false_type + ), Cond( - std::is_floating_point_v == std::is_floating_point_v, + std::is_signed_v == std::is_signed_v, Cond( - aq_szof == aq_szof, // deal with sizeof(void) - std::true_type, + std::is_floating_point_v == std::is_floating_point_v, + Cond( + aq_szof == aq_szof, // deal with sizeof(void) + std::true_type, + std::false_type + ), std::false_type ), std::false_type - ), - std::false_type + ) >::value; }; - +// make sure size_t/ptr_t and the corresponding integer types are the same template constexpr bool aqis_same = aqis_same_impl::value && aqis_same; template constexpr bool aqis_same = aqis_same_impl::value; + namespace types { enum Type_t { AINT32, AFLOAT, ASTR, ADOUBLE, ALDOUBLE, AINT64, AINT128, AINT16, ADATE, ATIME, AINT8, diff --git a/server/vector_type.hpp b/server/vector_type.hpp index 98d79f5..3ad5fcc 100644 --- a/server/vector_type.hpp +++ b/server/vector_type.hpp @@ -265,7 +265,7 @@ public: } size = this->size + dist; } - inline void out(uint32_t n = 4, const char* sep = " ") const + inline void out(uint32_t n = 4000, const char* sep = " ") const { const char* more = ""; if (n < this->size) diff --git a/server/winhelper.cpp b/server/winhelper.cpp index 48bfa86..d59f58b 100644 --- a/server/winhelper.cpp +++ b/server/winhelper.cpp @@ -41,4 +41,20 @@ void SharedMemory::FreeMemoryMap() if (this->hFileMap) CloseHandle(this->hFileMap); } + +#ifndef __USE_STD_SEMAPHORE__ +A_Semaphore::A_Semaphore(bool v = false) { + native_handle = CreateSemaphore(NULL, v, 1, NULL); +} +void A_Semaphore::acquire() { + WaitForSingleObject(native_handle, INFINITE); +} +void A_Semaphore::release() { + ReleaseSemaphore(native_handle, 1, NULL); +} +A_Semaphore::~A_Semaphore() { + CloseHandle(native_handle); +} +#endif + #endif diff --git a/server/winhelper.h b/server/winhelper.h index df9231e..f39c0b9 100644 --- a/server/winhelper.h +++ b/server/winhelper.h @@ -14,5 +14,17 @@ struct SharedMemory SharedMemory(const char*); void FreeMemoryMap(); }; + +#ifndef __USE_STD_SEMAPHORE__ +class A_Semaphore { +private: + void* native_handle; +public: + A_Semaphore(); + void acquire(); + void release(); + ~A_Semaphore(); +}; #endif + #endif diff --git a/tests/datagen_jose/Time.cpp b/tests/datagen_jose/Time.cpp index 5f852cb..d5130d5 100644 --- a/tests/datagen_jose/Time.cpp +++ b/tests/datagen_jose/Time.cpp @@ -18,7 +18,7 @@ // /////////////////////////////////////////////////////////////////////////////// #include -#include "Time.H" +#include "Time.hpp" Time::Time(char *startTime_) { diff --git a/tests/funcs.a b/tests/funcs.a index 65316ce..7f17f0c 100644 --- a/tests/funcs.a +++ b/tests/funcs.a @@ -19,7 +19,7 @@ LOAD DATA INFILE "data/test.csv" INTO TABLE test1 FIELDS TERMINATED BY "," -SELECT pairCorr(c, b) * d, sum(a), b +SELECT pairCorr(c, b) * d, a, sum(b) FROM test1 -group by c,b,d +group by a order by b ASC diff --git a/tests/q4.a b/tests/q4.a index 8e9e626..4a4016b 100644 --- a/tests/q4.a +++ b/tests/q4.a @@ -30,10 +30,10 @@ INSERT INTO my_table VALUES(10, 20, "example") select * from my_table; INSERT INTO my_table SELECT * FROM my_table select * from my_table; -SELECT c1, c2 + c2 as twice_c2 FROM my_table; +SELECT c1, c2 as twice_c2 FROM my_table; CREATE TABLE my_table_derived AS - SELECT c1, c2 + c2 as twice_c2 FROM my_table; + SELECT c1, c2 as twice_c2 FROM my_table; SELECT * FROM my_table_derived;