From 1732835692325aaec2d378af82ce99ab5628d1b3 Mon Sep 17 00:00:00 2001 From: Bill Date: Thu, 25 Aug 2022 18:24:01 +0800 Subject: [PATCH 1/2] bug fixes --- aquery_config.py | 2 +- prompt.py | 113 ++++++++++++++++++++++++---------------- reconstruct/__init__.py | 3 +- 3 files changed, 71 insertions(+), 47 deletions(-) diff --git a/aquery_config.py b/aquery_config.py index 7a29c59..6b5f320 100644 --- a/aquery_config.py +++ b/aquery_config.py @@ -1,6 +1,7 @@ # put environment specific configuration here ## GLOBAL CONFIGURATION FLAGS +version_string = '0.3.3a' add_path_to_ldpath = True rebuild_backend = True run_backend = True @@ -47,7 +48,6 @@ def init_config(): os_platform = 'cygwin' # deal with msys dependencies: if os_platform == 'win': - add_dll_dir(cygroot) add_dll_dir(os.path.abspath('./msc-plugin')) # print("adding path") diff --git a/prompt.py b/prompt.py index bf47811..c3b715a 100644 --- a/prompt.py +++ b/prompt.py @@ -1,10 +1,58 @@ -import os +import aquery_config +help_message = '''\ +====================================================== + AQUERY COMMANDLINE HELP +====================================================== + +Run prompt.py without supplying with any arguments to run in interactive mode. + +--help, -h + print out this help message + +--version, -v + returns current version of AQuery + +--mode, -m Optional[threaded|IPC] + execution engine run mode: + threaded or 1 (default): run the execution engine and compiler/prompt in separate threads + IPC, standalong or 2: run the execution engine in a new process which uses shared memory to communicate with the compiler process + +--script, -s [SCRIPT_FILE] + script mode: run the aquery script file + +--parse-only, -p + parse only: parse the file and print out the AST +''' -from engine.ast import Context if __name__ == '__main__': import mimetypes mimetypes._winreg = None + state = None + nextcmd = '' + def check_param(param, args = False) -> bool: + global nextcmd + import sys + for p in param: + if p in sys.argv: + if args: + return True + pos = sys.argv.index(p) + if len(sys.argv) > pos + 1: + nextcmd = sys.argv[pos + 1] + return True + return False + + if check_param(['-v', '--version'], True): + print(aquery_config.version_string) + exit() + + if check_param(['-h', '--help'], True): + print(help_message) + exit() + + +import os from dataclasses import dataclass import enum from tabnanny import check @@ -25,30 +73,11 @@ from engine.utils import base62uuid import atexit import threading import ctypes -import aquery_config import numpy as np from engine.utils import ws from engine.utils import add_dll_dir -## GLOBALS BEGIN nullstream = open(os.devnull, 'w') -help_message = '''\ -Run prompt.py without supplying with any arguments to run in interactive mode. - ---help, -h - print out this help message ---version, -v - returns current version of AQuery ---mode, -m Optional[threaded|IPC] - execution engine run mode: - threaded or 1 (default): run the execution engine and compiler/prompt in separate threads - IPC, standalong or 2: run the execution engine in a new process which uses shared memory to communicate with the compiler process ---script, -s [SCRIPT_FILE] - script mode: run the aquery script file ---parse-only, -p - parse only: parse the file and print out the AST -''' -## GLOBALS END ## CLASSES BEGIN class RunType(enum.Enum): @@ -351,9 +380,19 @@ def main(running = lambda:True, next = input, state = None): elif q == 'rr': # run state.set_ready() continue - elif q == 'script': - # TODO: script mode - pass + elif q.startswith('script'): + qs = re.split(r'[ \t]', q) + if len(qs) > 1: + qs = qs[1] + with open(qs) as file: + qs = file.readline() + from engine.utils import _Counter + while(qs): + while(not ws.sub('', qs) or qs.strip().startswith('#')): + qs = file.readline() + cnt = _Counter(1) + main(lambda : cnt.inc(-1) > 0, lambda:qs.strip(), state) + qs = file.readline() elif q.startswith('save2'): filename = re.split(r'[ \t]', q) if (len(filename) > 1): @@ -396,29 +435,13 @@ def main(running = lambda:True, next = input, state = None): ## MAIN if __name__ == '__main__': - state = None - nextcmd = '' - def check_param(param:List[str], args = False) -> bool: - global nextcmd - for p in param: - if p in sys.argv: - if args: - return True - pos = sys.argv.index(p) - if len(sys.argv) > pos + 1: - nextcmd = sys.argv[pos + 1] - return True - return False - if check_param(['-h', '--help'], True): - print(help_message) - exit() - + if len(sys.argv) == 2: nextcmd = sys.argv[1] if nextcmd.startswith('-'): nextcmd = '' - nextcmd = 'test.aquery' + #nextcmd = 'test.aquery' if nextcmd or check_param(['-s', '--script']): with open(nextcmd) as file: nextcmd = file.readline() @@ -444,7 +467,7 @@ if __name__ == '__main__': if any([s in nextcmd for s in ipc_string]): server_mode = RunType.IPC elif any([s in nextcmd for s in thread_string]): - server_mode = RunType.Threaded - + server_mode = RunType.Threaded + main(state=state) - \ No newline at end of file + diff --git a/reconstruct/__init__.py b/reconstruct/__init__.py index 475e503..c27a9da 100644 --- a/reconstruct/__init__.py +++ b/reconstruct/__init__.py @@ -25,7 +25,8 @@ def exec(stmts, cxt = None, keep = False): generate(s, cxt) else: generate(stmts_stmts, cxt) - cxt.print(cxt.queries) + for q in cxt.queries: + cxt.print(q.strip()) return cxt __all__ = ["initialize", "generate", "exec", "saved_cxt"] From 42c334af84ae50515aa13a941af5fe57407a278e Mon Sep 17 00:00:00 2001 From: Bill Date: Fri, 26 Aug 2022 08:35:09 +0800 Subject: [PATCH 2/2] bug fixes, restructure, user module parsing --- Makefile | 1 + README.md | 34 +++++++++++++---- aquery_parser/sql_parser.py | 42 +++++++++++++++++++-- moving_avg.csv => data/moving_avg.csv | 0 nyctx100.csv => data/nyctx100.csv | 0 test.csv => data/test.csv | 0 test2.csv => data/test2.csv | 0 engine/types.py | 7 ++++ prompt.py | 40 +++++++++++++++++++- reconstruct/storage.py | 14 +++++-- server/server.cpp | 4 +- server/threading.h | 53 ++++++++++++++++++++------- funcs.a => tests/funcs.a | 0 joins.a => tests/joins.a | 0 modules.a => tests/modules.a | 3 +- moving_avg.a => tests/moving_avg.a | 0 q.sql => tests/q.sql | 0 q1.sql => tests/q1.sql | 0 stock.a => tests/stock.a | 0 tests/strings.a | 7 ++++ udf2.a => tests/udf2.a | 0 udf3.a => tests/udf3.a | 0 udf4.a => tests/udf4.a | 0 udftest.cpp | 14 ------- udftest_compile.cpp | 5 --- 25 files changed, 173 insertions(+), 51 deletions(-) rename moving_avg.csv => data/moving_avg.csv (100%) rename nyctx100.csv => data/nyctx100.csv (100%) rename test.csv => data/test.csv (100%) rename test2.csv => data/test2.csv (100%) rename funcs.a => tests/funcs.a (100%) rename joins.a => tests/joins.a (100%) rename modules.a => tests/modules.a (57%) rename moving_avg.a => tests/moving_avg.a (100%) rename q.sql => tests/q.sql (100%) rename q1.sql => tests/q1.sql (100%) rename stock.a => tests/stock.a (100%) create mode 100644 tests/strings.a rename udf2.a => tests/udf2.a (100%) rename udf3.a => tests/udf3.a (100%) rename udf4.a => tests/udf4.a (100%) delete mode 100644 udftest.cpp delete mode 100644 udftest_compile.cpp diff --git a/Makefile b/Makefile index fcea7cc..ff7b6e3 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,7 @@ info: $(info $(OS)) $(info $(Threading)) $(info "test") + $(info $(CXX)) server.bin: $(CXX) server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) $(Threading) -flto --std=c++1z -O3 -march=native -o server.bin server.so: diff --git a/README.md b/README.md index 13077ce..47ecdaa 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,18 @@ ## Introduction AQuery++ Database is a cross-platform, In-Memory Column-Store Database that incorporates compiled query execution. -Compiler frontend built on top of [mo-sql-parsing](https://github.com/klahnakoski/mo-sql-parsing). +## Architecture +### AQuery Compiler +- The query is first processed by the AQuery Compiler which is composed of a frontend that parses the query into AST and a backend that generates target code that delivers the query. +- Front end of AQuery++ Compiler is built on top of [mo-sql-parsing](https://github.com/klahnakoski/mo-sql-parsing) with modifications to handle AQuery dialect and extension. +- Backend of AQuery++ Compiler generates target code dependent on the Execution Engine. It can either be the C++ code for AQuery Execution Engine or sql and C++ post-processor for Hybrid Engine or k9 for the k9 Engine. +### Execution Engines +- AQuery++ supports different execution engines thanks to the decoupled compiler structure. +- AQuery Execution Engine: executes query by compiling the query plan to C++ code. Doesn't support joins and udf functions. +- Hybrid Execution Engine: decouples the query into two parts. The sql-compliant part is executed by an Embedded version of Monetdb and everything else is executed by a post-process module which is generated by AQuery++ Compiler in C++ and then compiled and executed. +- K9 Execution Engine (discontinued). + ## Roadmap - [x] SQL Parser -> AQuery Parser (Front End) - [ ] AQuery-C++ Compiler (Back End) @@ -16,29 +26,39 @@ Compiler frontend built on top of [mo-sql-parsing](https://github.com/klahnakosk - [x] Order by - [x] Assumption - [x] Flatten - - [ ] Multi-table - - [ ] Join + - [x] Multi-table + - [x] Join - [ ] Subqueries - [ ] -> Optimizing Compiler ## TODO: -- [ ] C++ Meta-Programming: Elimilate template recursions as much as possible. + +- [ ] User Module load syntax parsing (fn definition/registration) +- [ ] User Module test +- [ ] Interval based triggers +- [ ] C++ Meta-Programming: Eliminate template recursions as much as possible. - [ ] IPC: Better ways to communicate between Interpreter (Python) and Executer (C++). - [ ] Sockets? stdin/stdout capture? ## Requirements -Recent version of Linux, Windows or MacOS, with recent C++ compiler that has C++17 (1z) support (e.g. gcc 6.0, MSVC 2017, clang 6.0), and python 3.6 or above. +Recent version of Linux, Windows or MacOS, with recent C++ compiler that has C++17 (1z) support. +- GCC: 9.0 or above (g++ 7.x, 8.x fail to handle variadic template expansion due to compiler bug) +- Clang: 6.0 or above (Recommended) +- MSVC: 2019 or later ## Usage `python3 prompt.py` will launch the interactive command prompt. The server binary will be autometically rebuilt and started. #### Commands: - ``: parse sql statement - `f `: parse all sql statements in file +- `dbg` start debugging session - `print`: printout parsed sql statements -- `exec`: execute last parsed statement(s) +- `exec`: execute last parsed statement(s) with AQuery Execution Engine. AQuery Execution Engine executes query by compiling it to C++ code and then executing it. + +- `xexec`: execute last parsed statement(s) with Hybrid Execution Engine. Hybrid Execution Engine decouples the query into two parts. The sql-compliant part is executed by an Embedded version of Monetdb and everything else is executed by a post-process module which is generated by AQuery++ Compiler in C++ and then compiled and executed. - `r`: run the last generated code snippet - `save `: save current code snippet. will use random filename if not specified. - `exit`: quit the prompt #### Example: `f moving_avg.a`
- `exec` + `xexec` diff --git a/aquery_parser/sql_parser.py b/aquery_parser/sql_parser.py index 9a6e88a..814305f 100644 --- a/aquery_parser/sql_parser.py +++ b/aquery_parser/sql_parser.py @@ -7,6 +7,7 @@ # Contact: Kyle Lahnakoski (kyle@lahnakoski.com) # +from sre_parse import WHITESPACE from mo_parsing.helpers import restOfLine from mo_parsing.infix import delimited_list from mo_parsing.whitespaces import NO_WHITESPACE, Whitespace @@ -648,9 +649,8 @@ def parser(literal_string, ident, sqlserver=False): + Optional(assign("where", expr)) ) / to_json_call - load = ( - keyword("load")("op") - + keyword("data").suppress() + load_data = ( + keyword("data").suppress() + keyword("infile")("loc") + literal_string ("file") + INTO @@ -662,6 +662,42 @@ def parser(literal_string, ident, sqlserver=False): + keyword("by").suppress() + literal_string ("term") ) + ) + + module_func_def = ( + var_name("fname") + + LB + + delimited_list( + ( + var_name("arg") + + COLON + + var_name("type") + )("vars") + ) + + RB + + LAMBDA + + var_name("ret_type") + ) + + load_module = ( + keyword("module").suppress() + + FROM + + literal_string ("file") + + Optional( + keyword("FUNCTIONS").suppress() + + LB + + module_func_def("funcs") + + ZeroOrMore(Suppress(',') + + module_func_def("funcs"), + Whitespace() + ) + + RB + ) + ) + + load = ( + keyword("load")("op") + + (load_data | load_module) ) ("load") diff --git a/moving_avg.csv b/data/moving_avg.csv similarity index 100% rename from moving_avg.csv rename to data/moving_avg.csv diff --git a/nyctx100.csv b/data/nyctx100.csv similarity index 100% rename from nyctx100.csv rename to data/nyctx100.csv diff --git a/test.csv b/data/test.csv similarity index 100% rename from test.csv rename to data/test.csv diff --git a/test2.csv b/data/test2.csv similarity index 100% rename from test2.csv rename to data/test2.csv diff --git a/engine/types.py b/engine/types.py index 699bbeb..921f966 100644 --- a/engine/types.py +++ b/engine/types.py @@ -1,3 +1,4 @@ +from copy import deepcopy from engine.utils import defval from aquery_config import have_hge from typing import Dict, List @@ -50,6 +51,12 @@ class Types: return self.cast_from_dict[ty.name](ty) else: raise Exception(f'Illeagal cast: from {ty.name} to {self.name}.') + + def __call__(self, args): + arg_str = ', '.join([a.__str__() for a in args]) + ret = deepcopy(self) + ret.sqlname = self.sqlname + f'({arg_str})' + return ret def __repr__(self) -> str: return self.sqlname diff --git a/prompt.py b/prompt.py index c3b715a..d383c9c 100644 --- a/prompt.py +++ b/prompt.py @@ -24,6 +24,36 @@ Run prompt.py without supplying with any arguments to run in interactive mode. parse only: parse the file and print out the AST ''' +prompt_help = '''\ + +******** AQuery Prompt Help ********* + +help: + print out this message +help commandline: + print help message for AQuery Commandline +: + parse sql statement +f : + parse all AQuery statements in file +script : + run AQuery Script in file +dbg: + start debugging session with current context +print: + printout parsed sql statements +exec: + execute last parsed statement(s) with AQuery Execution Engine +xexec: + execute last parsed statement(s) with Hybrid Execution Engine +r: + run the last generated code snippet +save : + save current code snippet. will use timestamp as filename if not specified. +exit or Ctrl+C: + exit prompt mode +''' + if __name__ == '__main__': import mimetypes mimetypes._winreg = None @@ -308,7 +338,13 @@ def main(running = lambda:True, next = input, state = None): if subprocess.call(['make', 'snippet'], stdout = nullstream) == 0: state.set_ready() continue - + if q.startswith('help'): + qs = re.split(r'[ \t]', q) + if len(qs) > 1 and qs[1].startswith('c'): + print(help_message) + else: + print(prompt_help) + continue elif q == 'xexec': # generate build and run (MonetDB Engine) state.cfg.backend_type = Backend_Type.BACKEND_MonetDB.value cxt = xengine.exec(state.stmts, cxt, keep) @@ -429,7 +465,7 @@ def main(running = lambda:True, next = input, state = None): sh.interact(banner = traceback.format_exc(), exitmsg = 'debugging session ended.') save('', cxt) rm(state) - raise + raise rm(state) ## FUNCTIONS END diff --git a/reconstruct/storage.py b/reconstruct/storage.py index 2c93f9e..4820475 100644 --- a/reconstruct/storage.py +++ b/reconstruct/storage.py @@ -1,10 +1,12 @@ from engine.types import * - +from engine.utils import enlist class ColRef: - def __init__(self, _ty, cobj, table:'TableInfo', name, id, compound = False): + def __init__(self, _ty, cobj, table:'TableInfo', name, id, compound = False, _ty_args = None): self.type : Types = AnyT if type(_ty) is str: self.type = builtin_types[_ty.lower()] + if _ty_args: + self.type = self.type(enlist(_ty_args)) elif type(_ty) is Types: self.type = _ty self.cobj = cobj @@ -47,9 +49,13 @@ class TableInfo: def add_col(self, c, new = True, i = 0): _ty = c['type'] + _ty_args = None + if type(_ty) is dict: + _ty_val = list(_ty.keys())[0] + _ty_args = _ty[_ty_val] + _ty = _ty_val if new: - _ty = _ty if type(c) is ColRef else list(_ty.keys())[0] - col_object = ColRef(_ty, c, self, c['name'], len(self.columns)) + col_object = ColRef(_ty, c, self, c['name'], len(self.columns), _ty_args = _ty_args) else: col_object = c c.table = self diff --git a/server/server.cpp b/server/server.cpp index 3c4003f..7bd7916 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -153,9 +153,9 @@ extern "C" int __DLLEXPORT__ main(int argc, char** argv) { #ifdef THREADING auto tp = new ThreadPool(); cxt->thread_pool = tp; + #endif - - + const char* shmname; if (argc < 0) return dll_main(argc, argv, cxt); diff --git a/server/threading.h b/server/threading.h index 861e941..c78283d 100644 --- a/server/threading.h +++ b/server/threading.h @@ -3,22 +3,22 @@ #include +typedef int(*payload_fn_t)(void*); +struct payload_t{ + payload_fn_t f; + void* args; + constexpr payload_t(payload_fn_t f, void* args) noexcept + : f(f), args(args) {} + constexpr payload_t() noexcept + : f(nullptr), args(nullptr) {}; + bool is_empty() const { return f && args; } + void empty() { f = nullptr; args = nullptr; } + void operator()() { f(args); } +}; + class ThreadPool{ public: - typedef void(*payload_fn_t)(void*); - - struct payload_t{ - payload_fn_t f; - void* args; - constexpr payload_t(payload_fn_t f, void* args) noexcept - : f(f), args(args) {} - constexpr payload_t() noexcept - : f(nullptr), args(nullptr) {}; - bool is_empty() const { return f && args; } - void empty() { f = nullptr; args = nullptr; } - void operator()() { f(args); } - }; ThreadPool(uint32_t n_threads = 0); void enqueue_task(const payload_t& payload); bool busy(); @@ -39,4 +39,31 @@ private: }; +class Trigger{ +private: + void* triggers; //min-heap by t-rem + virtual void tick() = 0; + +public: + Trigger(ThreadPool* tp); +}; + +class IntervalBasedTrigger : public Trigger{ +public: + struct timer{ + uint32_t interval; // in milliseconds + uint32_t time_remaining; + }; + void add_trigger(); +private: + void tick() override; +}; + +class CallbackBasedTrigger : public Trigger{ +public: + void add_trigger(); +private: + void tick() override; +}; + #endif diff --git a/funcs.a b/tests/funcs.a similarity index 100% rename from funcs.a rename to tests/funcs.a diff --git a/joins.a b/tests/joins.a similarity index 100% rename from joins.a rename to tests/joins.a diff --git a/modules.a b/tests/modules.a similarity index 57% rename from modules.a rename to tests/modules.a index 904c559..7e275f9 100644 --- a/modules.a +++ b/tests/modules.a @@ -1,4 +1,5 @@ LOAD MODULE FROM "test.so" -FUNCTIONS (div(a:int, b:int) -> double, +FUNCTIONS ( + div(a:int, b:int) -> double, mulvec(a:int, b:vecfloat) -> vecfloat ); \ No newline at end of file diff --git a/moving_avg.a b/tests/moving_avg.a similarity index 100% rename from moving_avg.a rename to tests/moving_avg.a diff --git a/q.sql b/tests/q.sql similarity index 100% rename from q.sql rename to tests/q.sql diff --git a/q1.sql b/tests/q1.sql similarity index 100% rename from q1.sql rename to tests/q1.sql diff --git a/stock.a b/tests/stock.a similarity index 100% rename from stock.a rename to tests/stock.a diff --git a/tests/strings.a b/tests/strings.a new file mode 100644 index 0000000..80d0ead --- /dev/null +++ b/tests/strings.a @@ -0,0 +1,7 @@ +CREATE TABLE types_test(names varchar(10), val real, id int) + +LOAD DATA INFILE "datatypes.csv" +INTO TABLE types_test +FIELDS TERMINATED BY "," + +select names, val * 10000 + id from types_test \ No newline at end of file diff --git a/udf2.a b/tests/udf2.a similarity index 100% rename from udf2.a rename to tests/udf2.a diff --git a/udf3.a b/tests/udf3.a similarity index 100% rename from udf3.a rename to tests/udf3.a diff --git a/udf4.a b/tests/udf4.a similarity index 100% rename from udf4.a rename to tests/udf4.a diff --git a/udftest.cpp b/udftest.cpp deleted file mode 100644 index c080820..0000000 --- a/udftest.cpp +++ /dev/null @@ -1,14 +0,0 @@ -#include "udf.hpp" - -int main(){ - vector_type _a{1,2,3,4}; - vector_type _b{2,3,3,5}; - ColRef a("a"); - ColRef b("b"); - a.initfrom(_a, "a"); - b.initfrom(_b, "b"); - ColRef ret{4}; - covariances2(a,b,2,4,ret); - - print(ret); -} diff --git a/udftest_compile.cpp b/udftest_compile.cpp deleted file mode 100644 index 278dd01..0000000 --- a/udftest_compile.cpp +++ /dev/null @@ -1,5 +0,0 @@ -#include "udf.hpp" - -int main(){ - -}