diff --git a/README.md b/README.md index 3624a73..1ee0e28 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ AQuery++ Database is a cross-platform, In-Memory Column-Store Database that inco ## Execution Engines - AQuery++ supports different execution engines thanks to the decoupled compiler structure. - Hybrid Execution Engine: decouples the query into two parts. The sql-compliant part is executed by an Embedded version of Monetdb and everything else is executed by a post-process module which is generated by AQuery++ Compiler in C++ and then compiled and executed. -- AQuery Library: A set of header based libraries that provide column arithmetic and operations inspired by array programming languages like kdb. This library is used by C++ post-processor code which can significantly reduce the complexity of generated code, reducing compile time while maintaining the best performance. The set of libraries can also be used by UDFs as well as User modules which makes it easier for users to write simple, efficient yet powerful extensions. +- AQuery Library: Consists of a pre-compiled static library and a set of headers with templated methods that provide column arithmetic, operations and relational algebra inspired by array programming languages like kdb. This library is used by C++ post-processor code which can significantly reduce the complexity of generated code, reducing compile time while maintaining the best performance. The set of libraries can also be used by UDFs as well as User modules which makes it easier for users to write simple, efficient yet powerful extensions. # Installation: ## Requirements @@ -171,7 +171,11 @@ save: query INTO OUTFILE string FIELDS TERMINATED BY string udf: FUNCTION ID '(' arg-list ')' '{' fun-body '}' arg_list: ID (, ID)* fun_body: [stmts] expr -/********* See more udf grammar later. **********/ + +/********* Triggers **********/ +create: CREATE TRIGGER ID [ ACTION ID INTERVAL num | ON ID ACTION ID WHEN ID ] +drop: DROP TRIGGER ID + stmts: stmt+ stmt: assignment; | if-stmt | for-stmt | ; @@ -300,11 +304,11 @@ SELECT * FROM my_table WHERE c1 > 10 - [x] UDFs (Hybrid Engine only) - [x] SDK and User Module - [x] Stored Procedures - - [ ] Triggers + - [x] Triggers # Known Issues: -- [ ] Interval based triggers +- [x] Interval based triggers - [ ] Hot reloading server binary - [x] Bug fixes: type deduction misaligned in Hybrid Engine - [ ] Investigation: Using postproc only for q1 in Hybrid Engine (make is_special always on) diff --git a/aquery_config.py b/aquery_config.py index 41f69c0..d297398 100644 --- a/aquery_config.py +++ b/aquery_config.py @@ -2,7 +2,7 @@ ## GLOBAL CONFIGURATION FLAGS -version_string = '0.7.0a' +version_string = '0.7.1a' add_path_to_ldpath = True rebuild_backend = False run_backend = True @@ -62,7 +62,10 @@ def init_config(): build_driver = 'Makefile' # print("adding path") else: - import readline + try: + import readline + except ImportError: + print("Warning: Readline module not present") if build_driver == 'Auto': build_driver = 'Makefile' if os_platform == 'cygwin': diff --git a/aquery_parser/parser.py b/aquery_parser/parser.py index 711344c..a4237b6 100644 --- a/aquery_parser/parser.py +++ b/aquery_parser/parser.py @@ -590,6 +590,8 @@ def parser(literal_string, ident): )) )("create_trigger") + drop_trigger = (keyword("drop trigger") + var_name("name")) ("drop_trigger") + cache_options = Optional(( keyword("options").suppress() + LB @@ -713,7 +715,7 @@ def parser(literal_string, ident): query | (insert | update | delete | load) | (create_table | create_view | create_cache | create_index | create_trigger) - | (drop_table | drop_view | drop_index) + | (drop_table | drop_view | drop_index | drop_trigger) )("stmts"), ";") other_stmt = ( diff --git a/engine/utils.py b/engine/utils.py index 00597f9..3a5b662 100644 --- a/engine/utils.py +++ b/engine/utils.py @@ -180,8 +180,8 @@ def get_storedproc(name : str): else: ret : StoredProcedure = cxt.get_storedproc(bytes(name, 'utf-8')) if ( - ret.name.value and - ret.name.value.decode('utf-8') != name + ret.name and + ret.name.decode('utf-8') != name ): print(f'Procedure {name} mismatch in server {ret.name.value}') return None diff --git a/paper b/paper index fa4e3f5..a61c312 160000 --- a/paper +++ b/paper @@ -1 +1 @@ -Subproject commit fa4e3f5a0606b2dda75faaacfb66cdaf42153260 +Subproject commit a61c3122c43293ff6f8bd01b4f65d7d03c5c4c54 diff --git a/proctool.py b/proctool.py index c92723e..4757a85 100644 --- a/proctool.py +++ b/proctool.py @@ -43,16 +43,24 @@ def read(cmd : str): clip += q + '\n' if rc and not input('copy to clipboard?').lower().startswith('n'): import pyperclip - pyperclip.copy(clip) - + pyperclip.copy(clip) + if __name__ == '__main__': + import os + files = os.listdir('./procedures/') while True: cmd = input("r for read, rc to read c_str, w for write: ") - if cmd.lower().startswith('r'): + if cmd.lower().startswith('ra'): + for f in files: + if f.endswith('.aqp'): + name = f[:-4] + read('r') + input('press any key to continue') + elif cmd.lower().startswith('r'): read(cmd.lower()) break elif cmd.lower().startswith('w'): - write() + write() break elif cmd.lower().startswith('q'): break diff --git a/prompt.py b/prompt.py index e7fefa9..52adfbe 100644 --- a/prompt.py +++ b/prompt.py @@ -332,7 +332,7 @@ def init_threaded(state : PromptState): state.send = server_so['receive_args'] state.wait_engine = server_so['wait_engine'] state.wake_engine = server_so['wake_engine'] - state.get_storedproc = server_so['get_procedure'] + state.get_storedproc = server_so['get_procedure_ex'] state.get_storedproc.restype = StoredProcedure aquery_config.have_hge = server_so['have_hge']() if aquery_config.have_hge != 0: @@ -342,11 +342,10 @@ def init_threaded(state : PromptState): state.th.start() def init_prompt() -> PromptState: - from engine.utils import session_context aquery_config.init_config() state = PromptState() - session_context = state + engine.utils.session_context = state # if aquery_config.rebuild_backend: # try: # os.remove(state.server_bin) @@ -504,6 +503,9 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr state.currstats.compile_time = state.currstats.stop() if build_this: state.set_ready() + while state.get_ready(): + state.wait_engine() + cxt.post_exec_triggers() state.currstats.need_print = True continue diff --git a/reconstruct/ast.py b/reconstruct/ast.py index 06f22bc..97407c2 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -1088,7 +1088,13 @@ class create_trigger(ast_node): class Type (Enum): Interval = auto() Callback = auto() - + def init(self, _): + # overload init to prevent automatic sql generation + pass + def consume(self, _): + # overload consume to prevent automatic sqlend action + pass + def produce(self, node): from engine.utils import send_to_server, get_storedproc node = node['create_trigger'] @@ -1097,7 +1103,7 @@ class create_trigger(ast_node): self.action = get_storedproc(self.action_name) if self.trigger_name in self.context.triggers: raise ValueError(f'trigger {self.trigger_name} exists') - elif self.action: + elif not self.action: raise ValueError(f'Stored Procedure {self.action_name} do not exist') if 'interval' in node: # executed periodically from server diff --git a/server/libaquery.h b/server/libaquery.h index 93109d5..62fd4b1 100644 --- a/server/libaquery.h +++ b/server/libaquery.h @@ -148,7 +148,14 @@ struct StoredProcedurePayload { Context* cxt; }; +struct StoredProcedurePayloadCond { + StoredProcedure *condition; + StoredProcedure *action; + Context* cxt; +}; + int execTriggerPayload(void*); +int execTriggerPayloadCond(void*); #ifdef _WIN32 #define __DLLEXPORT__ __declspec(dllexport) __stdcall diff --git a/server/monetdb_conn.cpp b/server/monetdb_conn.cpp index 0ef14c2..b90e88a 100644 --- a/server/monetdb_conn.cpp +++ b/server/monetdb_conn.cpp @@ -136,6 +136,7 @@ void Server::exec(const char* q){ bool Server::haserror(){ if (last_error){ + puts(last_error); last_error = nullptr; return true; } @@ -218,6 +219,50 @@ void* Server::getCol(int col_idx){ return nullptr; } +#define AQ_MONETDB_FETCH(X) case monetdbe_##X: \ + return (long long)((X *)(_ret_col->data))[0]; +long long Server::getFirstElement() { + if(!this->haserror() && res) { + auto _res = static_cast(this->res); + auto err_msg = monetdbe_result_fetch(_res, + reinterpret_cast(&ret_col), 0); + if(err_msg == nullptr) + { + auto _ret_col = static_cast(this->ret_col); + cnt = _ret_col->count; + if(cnt > 0) { + switch(_ret_col->type) { + AQ_MONETDB_FETCH(bool) + AQ_MONETDB_FETCH(int8_t) + AQ_MONETDB_FETCH(int16_t) + AQ_MONETDB_FETCH(int32_t) + AQ_MONETDB_FETCH(int64_t) +#ifdef HAVE_HGE + case monetdbe_int128_t: + return (long long)((__int128_t *)(_ret_col->data))[0]; +#endif + AQ_MONETDB_FETCH(size_t) + AQ_MONETDB_FETCH(float) + AQ_MONETDB_FETCH(double) + case monetdbe_str: + return ((const char **)(_ret_col->data))[0][0] == '\0'; + default: + printf("Error, non-primitive result: Getting col %s, type: %s\n", + _ret_col->name, monetdbe_type_str[_ret_col->type]); + return 0; + } + } + } + else { + printf("Error fetching result: %s\n", err_msg); + } + } + else { + puts("Error: No result."); + } + return 0; +} + Server::~Server(){ close(); } @@ -233,19 +278,23 @@ bool Server::havehge() { } -void ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){ +int ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){ auto server = static_cast(cxt->alt_server); + int ret = 0; + bool return_from_procedure = false; void* handle = nullptr; uint32_t procedure_module_cursor = 0; for(uint32_t i = 0; i < p->cnt; ++i) { switch(p->queries[i][0]){ + puts(p->queries[i]); case 'Q': { - server->exec(p->queries[i]); + server->exec(p->queries[i] + 1); + ret = int(server->getFirstElement()); } break; case 'P': { - auto c = code_snippet(dlsym(handle, p->queries[i]+1)); - c(cxt); + auto c = code_snippet(dlsym(handle, p->queries[i] + 1)); + ret = c(cxt); } break; case 'N': { @@ -266,11 +315,25 @@ void ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){ i, p->queries[i][0]); } } + + return ret; } int execTriggerPayload(void* args) { auto spp = (StoredProcedurePayload*)(args); - ExecuteStoredProcedureEx(spp->p, spp->cxt); + puts("exec trigger"); + auto ret = ExecuteStoredProcedureEx(spp->p, spp->cxt); delete spp; - return 0; + return ret; +} + +int execTriggerPayloadCond(void* args) { + int ret = 0; + auto spp = (StoredProcedurePayloadCond*)(args); + if(ExecuteStoredProcedureEx(spp->condition, spp->cxt) != 0) + ret = ExecuteStoredProcedureEx(spp->action, spp->cxt); + free(spp->condition); + free(spp->action); + delete spp; + return ret; } diff --git a/server/monetdb_conn.h b/server/monetdb_conn.h index bd5f2d0..614e3d4 100644 --- a/server/monetdb_conn.h +++ b/server/monetdb_conn.h @@ -19,6 +19,7 @@ struct Server{ void connect(Context* cxt); void exec(const char* q); void *getCol(int col_idx); + long long getFirstElement(); void close(); bool haserror(); static bool havehge(); diff --git a/server/server.cpp b/server/server.cpp index f917b72..a015422 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -94,8 +94,8 @@ have_hge() { return false; #endif } - -__AQEXPORT__(StoredProcedure) +Context* _g_cxt; +StoredProcedure get_procedure(Context* cxt, const char* name) { auto res = cxt->stored_proc.find(name); if (res == cxt->stored_proc.end()) @@ -107,7 +107,10 @@ get_procedure(Context* cxt, const char* name) { }; return res->second; } - +__AQEXPORT__(StoredProcedure) +get_procedure_ex(const char* name){ + return get_procedure(_g_cxt, name); +} using prt_fn_t = char* (*)(void*, char*); // This function contains heap allocations, free after use @@ -490,7 +493,7 @@ start: puts(p.queries[j-1]); } fclose(fp); - p.__rt_loaded_modules = 0; + p.__rt_loaded_modules = nullptr; return load_modules(p); }; switch(n_recvd[i][1]){ @@ -561,6 +564,7 @@ start: break; case 'T': // triggers { + puts(n_recvd[i]); switch(n_recvd[i][1]){ case 'I': // register interval based trigger { @@ -578,6 +582,23 @@ start: } break; case 'C': // activate callback based trigger + { + const char* query_name = n_recvd[i] + 2; + const char* action_name = query_name; + while(*action_name++); + if(auto q = get_procedure(cxt, query_name), + a = get_procedure(cxt, action_name); + q.name == nullptr || a.name == nullptr + ) + printf("Warning: Invalid query or action name: %s %s", + query_name, action_name); + else{ + auto query = AQ_DupObject(&q); + auto action = AQ_DupObject(&a); + + cxt->ct_host->execute_trigger(query, action); + } + } break; case 'R': // remove trigger { @@ -656,14 +677,15 @@ extern "C" int __DLLEXPORT__ main(int argc, char** argv) { #endif // puts("running"); Context* cxt = new Context(); + _g_cxt = cxt; cxt->aquery_root_path = to_lpstr(std::filesystem::current_path().string()); // cxt->log("%d %s\n", argc, argv[1]); #ifdef THREADING auto tp = new ThreadPool(); cxt->thread_pool = tp; - cxt->it_host = new IntervalBasedTriggerHost(tp); - cxt->ct_host = new CallbackBasedTriggerHost(tp); + cxt->it_host = new IntervalBasedTriggerHost(tp, cxt); + cxt->ct_host = new CallbackBasedTriggerHost(tp, cxt); #endif const char* shmname; diff --git a/server/threading.cpp b/server/threading.cpp index b905f62..121b530 100644 --- a/server/threading.cpp +++ b/server/threading.cpp @@ -154,11 +154,14 @@ bool ThreadPool::busy(){ return true; } -IntervalBasedTriggerHost::IntervalBasedTriggerHost(ThreadPool* tp){ +IntervalBasedTriggerHost::IntervalBasedTriggerHost(ThreadPool* tp, Context* cxt){ + this->cxt = cxt; this->tp = tp; this->triggers = new aq_map; trigger_queue_lock = new mutex(); this->now = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + this->running = true; + this->handle = new thread(&IntervalBasedTriggerHost::tick, this); } void IntervalBasedTriggerHost::add_trigger(const char* name, StoredProcedure *p, uint32_t interval) { @@ -171,21 +174,28 @@ void IntervalBasedTriggerHost::add_trigger(const char* name, StoredProcedure *p, } void IntervalBasedTriggerHost::tick() { - const auto current_time = std::chrono::high_resolution_clock::now().time_since_epoch().count(); - const auto delta_t = static_cast((current_time - now) / 1000000); // miliseconds precision + auto current_time = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + auto delta_t = static_cast((current_time - now) / 1000000); // miliseconds precision now = current_time; auto vt_triggers = static_cast *>(this->triggers); - trigger_queue_lock->lock(); - for(auto& [_, t] : vt_triggers->values()) { - if(t.tick(delta_t)) { - payload_t payload; - payload.f = execTriggerPayload; - payload.args = static_cast(new StoredProcedurePayload {t.sp, cxt}); - - tp->enqueue_task(payload); + fflush(stdout); + while(this->running) { + std::this_thread::sleep_for(50ms); + current_time = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + delta_t = static_cast((current_time - now) / 1000000); // miliseconds precision + now = current_time; + trigger_queue_lock->lock(); + for(auto& [_, t] : vt_triggers->values()) { + if(t.tick(delta_t)) { + payload_t payload; + payload.f = execTriggerPayload; + payload.args = static_cast(new StoredProcedurePayload {t.sp, cxt}); + + tp->enqueue_task(payload); + } } + trigger_queue_lock->unlock(); } - trigger_queue_lock->unlock(); } void IntervalBasedTriggerHost::remove_trigger(const char* name) { @@ -199,6 +209,7 @@ void IntervalBasedTrigger::reset() { bool IntervalBasedTrigger::tick(uint32_t delta_t) { bool ret = false; + // printf("%d %d\n",delta_t, time_remaining); if (time_remaining <= delta_t) ret = true; if (auto curr_dt = delta_t % interval; time_remaining <= curr_dt) @@ -208,8 +219,19 @@ bool IntervalBasedTrigger::tick(uint32_t delta_t) { return ret; } -CallbackBasedTriggerHost::CallbackBasedTriggerHost(ThreadPool *tp) { +CallbackBasedTriggerHost::CallbackBasedTriggerHost(ThreadPool *tp, Context *cxt) { this->tp = tp; + this->cxt = cxt; +} + +void CallbackBasedTriggerHost::execute_trigger(StoredProcedure* query, StoredProcedure* action) { + payload_t payload; + payload.f = execTriggerPayloadCond; + payload.args = static_cast( + new StoredProcedurePayloadCond {query, action, cxt} + ); + + this->tp->enqueue_task(payload); } -void CallbackBasedTriggerHost::tick() {} \ No newline at end of file +void CallbackBasedTriggerHost::tick() {} diff --git a/server/threading.h b/server/threading.h index 6e07936..01d02d7 100644 --- a/server/threading.h +++ b/server/threading.h @@ -71,18 +71,19 @@ struct IntervalBasedTrigger : Trigger { class IntervalBasedTriggerHost : public TriggerHost { public: - explicit IntervalBasedTriggerHost(ThreadPool *tp); + explicit IntervalBasedTriggerHost(ThreadPool *tp, Context* cxt); void add_trigger(const char* name, StoredProcedure* stored_procedure, uint32_t interval); void remove_trigger(const char* name); private: unsigned long long now; + bool running; void tick() override; }; class CallbackBasedTriggerHost : public TriggerHost { public: - explicit CallbackBasedTriggerHost(ThreadPool *tp); - void add_trigger(); + explicit CallbackBasedTriggerHost(ThreadPool *tp, Context *cxt); + void execute_trigger(StoredProcedure* query, StoredProcedure* action); private: void tick() override; }; diff --git a/server/utils.h b/server/utils.h index 6a7eb07..b4029ab 100644 --- a/server/utils.h +++ b/server/utils.h @@ -11,7 +11,7 @@ constexpr static bool cpp_17 = false; #endif template -inline const char* str(const T& v) { +inline const char* str([[maybe_unused]] const T& v) { return ""; } diff --git a/tests/triggers.aquery b/tests/triggers.aquery new file mode 100644 index 0000000..eb54269 --- /dev/null +++ b/tests/triggers.aquery @@ -0,0 +1,8 @@ +procedure one run +procedure two run + +create table t(a int); +exec +create trigger a on t action one when two; +exec +insert into t(a) values (1); \ No newline at end of file