finialize demo

master
Bill 2 years ago
parent 05cca378e0
commit c944b5dfcf

@ -0,0 +1,17 @@
#!aquery
f demo/prep.a
exec
procedure demoi run
procedure democq run
procedure democa run
create trigger t action demoi interval 15000
exec
create trigger c on source action democa when democq
exec
# f demo/test.a
# exec

@ -501,11 +501,11 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr
else: else:
state.cfg.has_dll = 0 state.cfg.has_dll = 0
state.currstats.compile_time = state.currstats.stop() state.currstats.compile_time = state.currstats.stop()
cxt.post_exec_triggers()
if build_this: if build_this:
state.set_ready() state.set_ready()
while state.get_ready(): # while state.get_ready():
state.wait_engine() # state.wait_engine()
cxt.post_exec_triggers()
state.currstats.need_print = True state.currstats.need_print = True
continue continue

@ -1109,7 +1109,7 @@ class create_trigger(ast_node):
if 'interval' in node: # executed periodically from server if 'interval' in node: # executed periodically from server
self.type = self.Type.Interval self.type = self.Type.Interval
self.interval = node['interval'] self.interval = node['interval']
send_to_server(f'TI{self.trigger_name}\0{self.action_name}\0{self.interval}') self.context.queries.append(f'TI{self.trigger_name}\0{self.action_name}\0{self.interval}')
else: # executed from sql backend else: # executed from sql backend
self.type = self.Type.Callback self.type = self.Type.Callback
self.query_name = node['query'] self.query_name = node['query']
@ -1118,11 +1118,10 @@ class create_trigger(ast_node):
if self.procedure and self.table_name in self.context.tables_byname: if self.procedure and self.table_name in self.context.tables_byname:
self.table = self.context.tables_byname[self.table_name] self.table = self.context.tables_byname[self.table_name]
self.table.triggers.add(self) self.table.triggers.add(self)
send_to_server( self.context.queries.append(
f'TC{self.trigger_name}\0{self.table_name}\0' f'TC{self.trigger_name}\0{self.table_name}\0'
f'{self.query_name}\0{self.action_name}' f'{self.query_name}\0{self.action_name}'
) )
else: else:
return return
self.context.triggers[self.trigger_name] = self self.context.triggers[self.trigger_name] = self
@ -1136,11 +1135,11 @@ class create_trigger(ast_node):
def execute(self): def execute(self):
from engine.utils import send_to_server from engine.utils import send_to_server
send_to_server(f'TA{self.query_name}\0{self.action_name}') self.context.queries.append(f'TA{self.query_name}\0{self.action_name}')
def remove(self): def remove(self):
from engine.utils import send_to_server from engine.utils import send_to_server
send_to_server(f'TR{self.trigger_name}') self.context.queries.append(f'TR{self.trigger_name}')
class drop_trigger(ast_node): class drop_trigger(ast_node):

@ -621,21 +621,15 @@ vector_type<std::string_view>::vector_type(const uint32_t size, void* data) :
//std::cout<<size << container[1]; //std::cout<<size << container[1];
} }
void activate_callback_based_trigger(Context* context, const char* cmd) StoredProcedure
{ get_procedure(Context* cxt, const char* name) {
const char* query_name = cmd + 2; auto res = cxt->stored_proc.find(name);
const char* action_name = query_name; if (res == cxt->stored_proc.end())
while (*action_name++); return { .cnt = 0,
if(auto q = get_procedure(cxt, query_name), .postproc_modules = 0,
a = get_procedure(cxt, action_name); .queries = nullptr,
q.name == nullptr || a.name == nullptr .name = nullptr,
) .__rt_loaded_modules = nullptr
printf("Warning: Invalid query or action name: %s %s", };
query_name, action_name); return res->second;
else {
auto query = AQ_DupObject(&q);
auto action = AQ_DupObject(&a);
cxt->ct_host->execute_trigger(query, action);
}
} }

@ -291,6 +291,5 @@ inline _This_Type* AQ_DupObject(_This_Type* __val) {
#endif //__USE_STD_SEMAPHORE__ #endif //__USE_STD_SEMAPHORE__
void print_monetdb_results(void* _srv, const char* sep, const char* end, uint32_t limit); void print_monetdb_results(void* _srv, const char* sep, const char* end, uint32_t limit);
void activate_callback_based_trigger(Context* context, const char* cmd); StoredProcedure get_procedure(Context* cxt, const char* name);
#endif #endif

@ -382,7 +382,7 @@ int ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){
break; break;
case 'T' : { case 'T' : {
if (p->queries[i][1] == 'N') { if (p->queries[i][1] == 'N') {
cxt->ct_host->execute_trigger(p->queries[i] + 2, cxt); cxt->ct_host->execute_trigger(p->queries[i] + 2);
} }
} }
break; break;

@ -97,24 +97,31 @@ have_hge() {
Context* _g_cxt; Context* _g_cxt;
StoredProcedure
get_procedure(Context* cxt, const char* name) {
auto res = cxt->stored_proc.find(name);
if (res == cxt->stored_proc.end())
return { .cnt = 0,
.postproc_modules = 0,
.queries = nullptr,
.name = nullptr,
.__rt_loaded_modules = nullptr
};
return res->second;
}
__AQEXPORT__(StoredProcedure) __AQEXPORT__(StoredProcedure)
get_procedure_ex(const char* name){ get_procedure_ex(const char* name){
return get_procedure(_g_cxt, name); return get_procedure(_g_cxt, name);
} }
void activate_callback_based_trigger(Context* context, const char* cmd)
{
const char* query_name = cmd + 2;
const char* action_name = query_name;
while (*action_name++);
if(auto q = get_procedure(context, query_name),
a = get_procedure(context, action_name);
q.name == nullptr || a.name == nullptr
)
printf("Warning: Invalid query or action name: %s %s",
query_name, action_name);
else {
auto query = AQ_DupObject(&q);
auto action = AQ_DupObject(&a);
context->ct_host->execute_trigger(query, action);
}
}
// This function contains heap allocations, free after use // This function contains heap allocations, free after use
template<class String_T> template<class String_T>
char* to_lpstr(const String_T& str){ char* to_lpstr(const String_T& str){

@ -235,14 +235,14 @@ void CallbackBasedTriggerHost::execute_trigger(StoredProcedure* query, StoredPro
this->tp->enqueue_task(payload); this->tp->enqueue_task(payload);
} }
void execute_trigger(const char* trigger_name) { void CallbackBasedTriggerHost::execute_trigger(const char* trigger_name) {
auto vt_triggers = static_cast<aq_map<std::string, CallbackBasedTrigger> *>(this->triggers); auto vt_triggers = static_cast<aq_map<std::string, CallbackBasedTrigger> *>(this->triggers);
auto ptr = vt_triggers->find(trigger_name); auto ptr = vt_triggers->find(trigger_name);
if (ptr != vt_triggers->end()) { if (ptr != vt_triggers->end()) {
auto& tr = ptr->second; auto& tr = ptr->second;
if (!tr.materialized) { if (!tr.materialized) {
tr.query = new CallbackBasedTrigger(get_procedure(cxt, tr.query_name)); tr.query = new StoredProcedure(get_procedure(cxt, tr.query_name));
tr.action = new CallbackBasedTrigger(get_procedure(cxt, tr.action_name)); tr.action = new StoredProcedure(get_procedure(cxt, tr.action_name));
tr.materialized = true; tr.materialized = true;
} }
this->execute_trigger(AQ_DupObject(tr.query), AQ_DupObject(tr.action)); this->execute_trigger(AQ_DupObject(tr.query), AQ_DupObject(tr.action));

Loading…
Cancel
Save