trigger type 2

master
Bill 2 years ago
parent 64d4e3dd9a
commit 84105347fc

@ -12,7 +12,7 @@ AQuery++ Database is a cross-platform, In-Memory Column-Store Database that inco
## Execution Engines
- AQuery++ supports different execution engines thanks to the decoupled compiler structure.
- Hybrid Execution Engine: decouples the query into two parts. The sql-compliant part is executed by an Embedded version of Monetdb and everything else is executed by a post-process module which is generated by AQuery++ Compiler in C++ and then compiled and executed.
- AQuery Library: A set of header based libraries that provide column arithmetic and operations inspired by array programming languages like kdb. This library is used by C++ post-processor code which can significantly reduce the complexity of generated code, reducing compile time while maintaining the best performance. The set of libraries can also be used by UDFs as well as User modules which makes it easier for users to write simple, efficient yet powerful extensions.
- AQuery Library: Consists of a pre-compiled static library and a set of headers with templated methods that provide column arithmetic, operations and relational algebra inspired by array programming languages like kdb. This library is used by C++ post-processor code which can significantly reduce the complexity of generated code, reducing compile time while maintaining the best performance. The set of libraries can also be used by UDFs as well as User modules which makes it easier for users to write simple, efficient yet powerful extensions.
# Installation:
## Requirements
@ -171,7 +171,11 @@ save: query INTO OUTFILE string FIELDS TERMINATED BY string
udf: FUNCTION ID '(' arg-list ')' '{' fun-body '}'
arg_list: ID (, ID)*
fun_body: [stmts] expr
/********* See more udf grammar later. **********/
/********* Triggers **********/
create: CREATE TRIGGER ID [ ACTION ID INTERVAL num | ON ID ACTION ID WHEN ID ]
drop: DROP TRIGGER ID
stmts: stmt+
stmt: assignment; | if-stmt | for-stmt | ;
@ -300,11 +304,11 @@ SELECT * FROM my_table WHERE c1 > 10
- [x] UDFs (Hybrid Engine only)
- [x] SDK and User Module
- [x] Stored Procedures
- [ ] Triggers
- [x] Triggers
# Known Issues:
- [ ] Interval based triggers
- [x] Interval based triggers
- [ ] Hot reloading server binary
- [x] Bug fixes: type deduction misaligned in Hybrid Engine
- [ ] Investigation: Using postproc only for q1 in Hybrid Engine (make is_special always on)

@ -2,7 +2,7 @@
## GLOBAL CONFIGURATION FLAGS
version_string = '0.7.0a'
version_string = '0.7.1a'
add_path_to_ldpath = True
rebuild_backend = False
run_backend = True
@ -62,7 +62,10 @@ def init_config():
build_driver = 'Makefile'
# print("adding path")
else:
import readline
try:
import readline
except ImportError:
print("Warning: Readline module not present")
if build_driver == 'Auto':
build_driver = 'Makefile'
if os_platform == 'cygwin':

@ -590,6 +590,8 @@ def parser(literal_string, ident):
))
)("create_trigger")
drop_trigger = (keyword("drop trigger") + var_name("name")) ("drop_trigger")
cache_options = Optional((
keyword("options").suppress()
+ LB
@ -713,7 +715,7 @@ def parser(literal_string, ident):
query
| (insert | update | delete | load)
| (create_table | create_view | create_cache | create_index | create_trigger)
| (drop_table | drop_view | drop_index)
| (drop_table | drop_view | drop_index | drop_trigger)
)("stmts"), ";")
other_stmt = (

@ -180,8 +180,8 @@ def get_storedproc(name : str):
else:
ret : StoredProcedure = cxt.get_storedproc(bytes(name, 'utf-8'))
if (
ret.name.value and
ret.name.value.decode('utf-8') != name
ret.name and
ret.name.decode('utf-8') != name
):
print(f'Procedure {name} mismatch in server {ret.name.value}')
return None

@ -1 +1 @@
Subproject commit fa4e3f5a0606b2dda75faaacfb66cdaf42153260
Subproject commit a61c3122c43293ff6f8bd01b4f65d7d03c5c4c54

@ -43,16 +43,24 @@ def read(cmd : str):
clip += q + '\n'
if rc and not input('copy to clipboard?').lower().startswith('n'):
import pyperclip
pyperclip.copy(clip)
pyperclip.copy(clip)
if __name__ == '__main__':
import os
files = os.listdir('./procedures/')
while True:
cmd = input("r for read, rc to read c_str, w for write: ")
if cmd.lower().startswith('r'):
if cmd.lower().startswith('ra'):
for f in files:
if f.endswith('.aqp'):
name = f[:-4]
read('r')
input('press any key to continue')
elif cmd.lower().startswith('r'):
read(cmd.lower())
break
elif cmd.lower().startswith('w'):
write()
write()
break
elif cmd.lower().startswith('q'):
break

@ -332,7 +332,7 @@ def init_threaded(state : PromptState):
state.send = server_so['receive_args']
state.wait_engine = server_so['wait_engine']
state.wake_engine = server_so['wake_engine']
state.get_storedproc = server_so['get_procedure']
state.get_storedproc = server_so['get_procedure_ex']
state.get_storedproc.restype = StoredProcedure
aquery_config.have_hge = server_so['have_hge']()
if aquery_config.have_hge != 0:
@ -342,11 +342,10 @@ def init_threaded(state : PromptState):
state.th.start()
def init_prompt() -> PromptState:
from engine.utils import session_context
aquery_config.init_config()
state = PromptState()
session_context = state
engine.utils.session_context = state
# if aquery_config.rebuild_backend:
# try:
# os.remove(state.server_bin)
@ -504,6 +503,9 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr
state.currstats.compile_time = state.currstats.stop()
if build_this:
state.set_ready()
while state.get_ready():
state.wait_engine()
cxt.post_exec_triggers()
state.currstats.need_print = True
continue

@ -1088,7 +1088,13 @@ class create_trigger(ast_node):
class Type (Enum):
Interval = auto()
Callback = auto()
def init(self, _):
# overload init to prevent automatic sql generation
pass
def consume(self, _):
# overload consume to prevent automatic sqlend action
pass
def produce(self, node):
from engine.utils import send_to_server, get_storedproc
node = node['create_trigger']
@ -1097,7 +1103,7 @@ class create_trigger(ast_node):
self.action = get_storedproc(self.action_name)
if self.trigger_name in self.context.triggers:
raise ValueError(f'trigger {self.trigger_name} exists')
elif self.action:
elif not self.action:
raise ValueError(f'Stored Procedure {self.action_name} do not exist')
if 'interval' in node: # executed periodically from server

@ -148,7 +148,14 @@ struct StoredProcedurePayload {
Context* cxt;
};
struct StoredProcedurePayloadCond {
StoredProcedure *condition;
StoredProcedure *action;
Context* cxt;
};
int execTriggerPayload(void*);
int execTriggerPayloadCond(void*);
#ifdef _WIN32
#define __DLLEXPORT__ __declspec(dllexport) __stdcall

@ -136,6 +136,7 @@ void Server::exec(const char* q){
bool Server::haserror(){
if (last_error){
puts(last_error);
last_error = nullptr;
return true;
}
@ -218,6 +219,50 @@ void* Server::getCol(int col_idx){
return nullptr;
}
#define AQ_MONETDB_FETCH(X) case monetdbe_##X: \
return (long long)((X *)(_ret_col->data))[0];
long long Server::getFirstElement() {
if(!this->haserror() && res) {
auto _res = static_cast<monetdbe_result*>(this->res);
auto err_msg = monetdbe_result_fetch(_res,
reinterpret_cast<monetdbe_column**>(&ret_col), 0);
if(err_msg == nullptr)
{
auto _ret_col = static_cast<monetdbe_column*>(this->ret_col);
cnt = _ret_col->count;
if(cnt > 0) {
switch(_ret_col->type) {
AQ_MONETDB_FETCH(bool)
AQ_MONETDB_FETCH(int8_t)
AQ_MONETDB_FETCH(int16_t)
AQ_MONETDB_FETCH(int32_t)
AQ_MONETDB_FETCH(int64_t)
#ifdef HAVE_HGE
case monetdbe_int128_t:
return (long long)((__int128_t *)(_ret_col->data))[0];
#endif
AQ_MONETDB_FETCH(size_t)
AQ_MONETDB_FETCH(float)
AQ_MONETDB_FETCH(double)
case monetdbe_str:
return ((const char **)(_ret_col->data))[0][0] == '\0';
default:
printf("Error, non-primitive result: Getting col %s, type: %s\n",
_ret_col->name, monetdbe_type_str[_ret_col->type]);
return 0;
}
}
}
else {
printf("Error fetching result: %s\n", err_msg);
}
}
else {
puts("Error: No result.");
}
return 0;
}
Server::~Server(){
close();
}
@ -233,19 +278,23 @@ bool Server::havehge() {
}
void ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){
int ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){
auto server = static_cast<Server*>(cxt->alt_server);
int ret = 0;
bool return_from_procedure = false;
void* handle = nullptr;
uint32_t procedure_module_cursor = 0;
for(uint32_t i = 0; i < p->cnt; ++i) {
switch(p->queries[i][0]){
puts(p->queries[i]);
case 'Q': {
server->exec(p->queries[i]);
server->exec(p->queries[i] + 1);
ret = int(server->getFirstElement());
}
break;
case 'P': {
auto c = code_snippet(dlsym(handle, p->queries[i]+1));
c(cxt);
auto c = code_snippet(dlsym(handle, p->queries[i] + 1));
ret = c(cxt);
}
break;
case 'N': {
@ -266,11 +315,25 @@ void ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){
i, p->queries[i][0]);
}
}
return ret;
}
int execTriggerPayload(void* args) {
auto spp = (StoredProcedurePayload*)(args);
ExecuteStoredProcedureEx(spp->p, spp->cxt);
puts("exec trigger");
auto ret = ExecuteStoredProcedureEx(spp->p, spp->cxt);
delete spp;
return 0;
return ret;
}
int execTriggerPayloadCond(void* args) {
int ret = 0;
auto spp = (StoredProcedurePayloadCond*)(args);
if(ExecuteStoredProcedureEx(spp->condition, spp->cxt) != 0)
ret = ExecuteStoredProcedureEx(spp->action, spp->cxt);
free(spp->condition);
free(spp->action);
delete spp;
return ret;
}

@ -19,6 +19,7 @@ struct Server{
void connect(Context* cxt);
void exec(const char* q);
void *getCol(int col_idx);
long long getFirstElement();
void close();
bool haserror();
static bool havehge();

@ -94,8 +94,8 @@ have_hge() {
return false;
#endif
}
__AQEXPORT__(StoredProcedure)
Context* _g_cxt;
StoredProcedure
get_procedure(Context* cxt, const char* name) {
auto res = cxt->stored_proc.find(name);
if (res == cxt->stored_proc.end())
@ -107,7 +107,10 @@ get_procedure(Context* cxt, const char* name) {
};
return res->second;
}
__AQEXPORT__(StoredProcedure)
get_procedure_ex(const char* name){
return get_procedure(_g_cxt, name);
}
using prt_fn_t = char* (*)(void*, char*);
// This function contains heap allocations, free after use
@ -490,7 +493,7 @@ start:
puts(p.queries[j-1]);
}
fclose(fp);
p.__rt_loaded_modules = 0;
p.__rt_loaded_modules = nullptr;
return load_modules(p);
};
switch(n_recvd[i][1]){
@ -561,6 +564,7 @@ start:
break;
case 'T': // triggers
{
puts(n_recvd[i]);
switch(n_recvd[i][1]){
case 'I': // register interval based trigger
{
@ -578,6 +582,23 @@ start:
}
break;
case 'C': // activate callback based trigger
{
const char* query_name = n_recvd[i] + 2;
const char* action_name = query_name;
while(*action_name++);
if(auto q = get_procedure(cxt, query_name),
a = get_procedure(cxt, action_name);
q.name == nullptr || a.name == nullptr
)
printf("Warning: Invalid query or action name: %s %s",
query_name, action_name);
else{
auto query = AQ_DupObject(&q);
auto action = AQ_DupObject(&a);
cxt->ct_host->execute_trigger(query, action);
}
}
break;
case 'R': // remove trigger
{
@ -656,14 +677,15 @@ extern "C" int __DLLEXPORT__ main(int argc, char** argv) {
#endif
// puts("running");
Context* cxt = new Context();
_g_cxt = cxt;
cxt->aquery_root_path = to_lpstr(std::filesystem::current_path().string());
// cxt->log("%d %s\n", argc, argv[1]);
#ifdef THREADING
auto tp = new ThreadPool();
cxt->thread_pool = tp;
cxt->it_host = new IntervalBasedTriggerHost(tp);
cxt->ct_host = new CallbackBasedTriggerHost(tp);
cxt->it_host = new IntervalBasedTriggerHost(tp, cxt);
cxt->ct_host = new CallbackBasedTriggerHost(tp, cxt);
#endif
const char* shmname;

@ -154,11 +154,14 @@ bool ThreadPool::busy(){
return true;
}
IntervalBasedTriggerHost::IntervalBasedTriggerHost(ThreadPool* tp){
IntervalBasedTriggerHost::IntervalBasedTriggerHost(ThreadPool* tp, Context* cxt){
this->cxt = cxt;
this->tp = tp;
this->triggers = new aq_map<std::string, IntervalBasedTrigger>;
trigger_queue_lock = new mutex();
this->now = std::chrono::high_resolution_clock::now().time_since_epoch().count();
this->running = true;
this->handle = new thread(&IntervalBasedTriggerHost::tick, this);
}
void IntervalBasedTriggerHost::add_trigger(const char* name, StoredProcedure *p, uint32_t interval) {
@ -171,21 +174,28 @@ void IntervalBasedTriggerHost::add_trigger(const char* name, StoredProcedure *p,
}
void IntervalBasedTriggerHost::tick() {
const auto current_time = std::chrono::high_resolution_clock::now().time_since_epoch().count();
const auto delta_t = static_cast<uint32_t>((current_time - now) / 1000000); // miliseconds precision
auto current_time = std::chrono::high_resolution_clock::now().time_since_epoch().count();
auto delta_t = static_cast<uint32_t>((current_time - now) / 1000000); // miliseconds precision
now = current_time;
auto vt_triggers = static_cast<aq_map<std::string, IntervalBasedTrigger> *>(this->triggers);
trigger_queue_lock->lock();
for(auto& [_, t] : vt_triggers->values()) {
if(t.tick(delta_t)) {
payload_t payload;
payload.f = execTriggerPayload;
payload.args = static_cast<void*>(new StoredProcedurePayload {t.sp, cxt});
tp->enqueue_task(payload);
fflush(stdout);
while(this->running) {
std::this_thread::sleep_for(50ms);
current_time = std::chrono::high_resolution_clock::now().time_since_epoch().count();
delta_t = static_cast<uint32_t>((current_time - now) / 1000000); // miliseconds precision
now = current_time;
trigger_queue_lock->lock();
for(auto& [_, t] : vt_triggers->values()) {
if(t.tick(delta_t)) {
payload_t payload;
payload.f = execTriggerPayload;
payload.args = static_cast<void*>(new StoredProcedurePayload {t.sp, cxt});
tp->enqueue_task(payload);
}
}
trigger_queue_lock->unlock();
}
trigger_queue_lock->unlock();
}
void IntervalBasedTriggerHost::remove_trigger(const char* name) {
@ -199,6 +209,7 @@ void IntervalBasedTrigger::reset() {
bool IntervalBasedTrigger::tick(uint32_t delta_t) {
bool ret = false;
// printf("%d %d\n",delta_t, time_remaining);
if (time_remaining <= delta_t)
ret = true;
if (auto curr_dt = delta_t % interval; time_remaining <= curr_dt)
@ -208,8 +219,19 @@ bool IntervalBasedTrigger::tick(uint32_t delta_t) {
return ret;
}
CallbackBasedTriggerHost::CallbackBasedTriggerHost(ThreadPool *tp) {
CallbackBasedTriggerHost::CallbackBasedTriggerHost(ThreadPool *tp, Context *cxt) {
this->tp = tp;
this->cxt = cxt;
}
void CallbackBasedTriggerHost::execute_trigger(StoredProcedure* query, StoredProcedure* action) {
payload_t payload;
payload.f = execTriggerPayloadCond;
payload.args = static_cast<void*>(
new StoredProcedurePayloadCond {query, action, cxt}
);
this->tp->enqueue_task(payload);
}
void CallbackBasedTriggerHost::tick() {}
void CallbackBasedTriggerHost::tick() {}

@ -71,18 +71,19 @@ struct IntervalBasedTrigger : Trigger {
class IntervalBasedTriggerHost : public TriggerHost {
public:
explicit IntervalBasedTriggerHost(ThreadPool *tp);
explicit IntervalBasedTriggerHost(ThreadPool *tp, Context* cxt);
void add_trigger(const char* name, StoredProcedure* stored_procedure, uint32_t interval);
void remove_trigger(const char* name);
private:
unsigned long long now;
bool running;
void tick() override;
};
class CallbackBasedTriggerHost : public TriggerHost {
public:
explicit CallbackBasedTriggerHost(ThreadPool *tp);
void add_trigger();
explicit CallbackBasedTriggerHost(ThreadPool *tp, Context *cxt);
void execute_trigger(StoredProcedure* query, StoredProcedure* action);
private:
void tick() override;
};

@ -11,7 +11,7 @@ constexpr static bool cpp_17 = false;
#endif
template <class T>
inline const char* str(const T& v) {
inline const char* str([[maybe_unused]] const T& v) {
return "";
}

@ -0,0 +1,8 @@
procedure one run
procedure two run
create table t(a int);
exec
create trigger a on t action one when two;
exec
insert into t(a) values (1);
Loading…
Cancel
Save