diff --git a/demo/demo.aquery b/demo/demo.aquery new file mode 100644 index 0000000..c0db643 --- /dev/null +++ b/demo/demo.aquery @@ -0,0 +1,17 @@ +#!aquery + +f demo/prep.a +exec + +procedure demoi run +procedure democq run +procedure democa run + +create trigger t action demoi interval 15000 +exec + +create trigger c on source action democa when democq +exec + +# f demo/test.a +# exec diff --git a/prompt.py b/prompt.py index 52adfbe..93deb9c 100644 --- a/prompt.py +++ b/prompt.py @@ -501,11 +501,11 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr else: state.cfg.has_dll = 0 state.currstats.compile_time = state.currstats.stop() + cxt.post_exec_triggers() if build_this: state.set_ready() - while state.get_ready(): - state.wait_engine() - cxt.post_exec_triggers() + # while state.get_ready(): + # state.wait_engine() state.currstats.need_print = True continue diff --git a/reconstruct/ast.py b/reconstruct/ast.py index 7f291ef..30dfeb5 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -1109,7 +1109,7 @@ class create_trigger(ast_node): if 'interval' in node: # executed periodically from server self.type = self.Type.Interval self.interval = node['interval'] - send_to_server(f'TI{self.trigger_name}\0{self.action_name}\0{self.interval}') + self.context.queries.append(f'TI{self.trigger_name}\0{self.action_name}\0{self.interval}') else: # executed from sql backend self.type = self.Type.Callback self.query_name = node['query'] @@ -1118,11 +1118,10 @@ class create_trigger(ast_node): if self.procedure and self.table_name in self.context.tables_byname: self.table = self.context.tables_byname[self.table_name] self.table.triggers.add(self) - send_to_server( + self.context.queries.append( f'TC{self.trigger_name}\0{self.table_name}\0' f'{self.query_name}\0{self.action_name}' - ) - + ) else: return self.context.triggers[self.trigger_name] = self @@ -1136,11 +1135,11 @@ class create_trigger(ast_node): def execute(self): from engine.utils import send_to_server - send_to_server(f'TA{self.query_name}\0{self.action_name}') + self.context.queries.append(f'TA{self.query_name}\0{self.action_name}') def remove(self): from engine.utils import send_to_server - send_to_server(f'TR{self.trigger_name}') + self.context.queries.append(f'TR{self.trigger_name}') class drop_trigger(ast_node): diff --git a/server/libaquery.cpp b/server/libaquery.cpp index 7f4596c..041b7d6 100644 --- a/server/libaquery.cpp +++ b/server/libaquery.cpp @@ -621,21 +621,15 @@ vector_type::vector_type(const uint32_t size, void* data) : //std::cout<ct_host->execute_trigger(query, action); - } +StoredProcedure +get_procedure(Context* cxt, const char* name) { + auto res = cxt->stored_proc.find(name); + if (res == cxt->stored_proc.end()) + return { .cnt = 0, + .postproc_modules = 0, + .queries = nullptr, + .name = nullptr, + .__rt_loaded_modules = nullptr + }; + return res->second; } diff --git a/server/libaquery.h b/server/libaquery.h index 6f43005..5adced8 100644 --- a/server/libaquery.h +++ b/server/libaquery.h @@ -291,6 +291,5 @@ inline _This_Type* AQ_DupObject(_This_Type* __val) { #endif //__USE_STD_SEMAPHORE__ void print_monetdb_results(void* _srv, const char* sep, const char* end, uint32_t limit); -void activate_callback_based_trigger(Context* context, const char* cmd); - +StoredProcedure get_procedure(Context* cxt, const char* name); #endif diff --git a/server/monetdb_conn.cpp b/server/monetdb_conn.cpp index eacdaa7..502c787 100644 --- a/server/monetdb_conn.cpp +++ b/server/monetdb_conn.cpp @@ -382,7 +382,7 @@ int ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){ break; case 'T' : { if (p->queries[i][1] == 'N') { - cxt->ct_host->execute_trigger(p->queries[i] + 2, cxt); + cxt->ct_host->execute_trigger(p->queries[i] + 2); } } break; diff --git a/server/server.cpp b/server/server.cpp index 2439b21..d34f1df 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -97,24 +97,31 @@ have_hge() { Context* _g_cxt; -StoredProcedure -get_procedure(Context* cxt, const char* name) { - auto res = cxt->stored_proc.find(name); - if (res == cxt->stored_proc.end()) - return { .cnt = 0, - .postproc_modules = 0, - .queries = nullptr, - .name = nullptr, - .__rt_loaded_modules = nullptr - }; - return res->second; -} - __AQEXPORT__(StoredProcedure) get_procedure_ex(const char* name){ return get_procedure(_g_cxt, name); } +void activate_callback_based_trigger(Context* context, const char* cmd) +{ + const char* query_name = cmd + 2; + const char* action_name = query_name; + while (*action_name++); + if(auto q = get_procedure(context, query_name), + a = get_procedure(context, action_name); + q.name == nullptr || a.name == nullptr + ) + printf("Warning: Invalid query or action name: %s %s", + query_name, action_name); + else { + auto query = AQ_DupObject(&q); + auto action = AQ_DupObject(&a); + + context->ct_host->execute_trigger(query, action); + } +} + + // This function contains heap allocations, free after use template char* to_lpstr(const String_T& str){ diff --git a/server/threading.cpp b/server/threading.cpp index 81f1436..769e4dd 100644 --- a/server/threading.cpp +++ b/server/threading.cpp @@ -235,14 +235,14 @@ void CallbackBasedTriggerHost::execute_trigger(StoredProcedure* query, StoredPro this->tp->enqueue_task(payload); } -void execute_trigger(const char* trigger_name) { +void CallbackBasedTriggerHost::execute_trigger(const char* trigger_name) { auto vt_triggers = static_cast *>(this->triggers); auto ptr = vt_triggers->find(trigger_name); if (ptr != vt_triggers->end()) { auto& tr = ptr->second; if (!tr.materialized) { - tr.query = new CallbackBasedTrigger(get_procedure(cxt, tr.query_name)); - tr.action = new CallbackBasedTrigger(get_procedure(cxt, tr.action_name)); + tr.query = new StoredProcedure(get_procedure(cxt, tr.query_name)); + tr.action = new StoredProcedure(get_procedure(cxt, tr.action_name)); tr.materialized = true; } this->execute_trigger(AQ_DupObject(tr.query), AQ_DupObject(tr.action));