From 05cca378e0eb032fbbf1d890d321e4b2cee71aa7 Mon Sep 17 00:00:00 2001 From: Bill Date: Mon, 27 Feb 2023 09:38:02 +0800 Subject: [PATCH] trigger demo --- Makefile | 2 +- demo/Makefile | 9 +++ demo/action.cpp | 30 ++++++++ demo/democa.aqp | Bin 0 -> 18 bytes demo/democq.aqp | Bin 0 -> 17 bytes demo/demoi.aqp | Bin 0 -> 14 bytes demo/prep.a | 13 ++++ demo/putdata.cpp | 47 ++++++++++++ demo/query.cpp | 30 ++++++++ demo/test.a | 3 + proctool.py | 1 + reconstruct/ast.py | 7 +- rfdata_preproc.py | 2 +- sdk/Evaluation.cpp | 2 +- sdk/Makefile | 5 +- sdk/aquery.h | 23 ++++-- sdk/incrementalDecisionTree.cpp | 13 +++- sdk/irf.cpp | 35 +++++---- server/libaquery.cpp | 19 +++++ server/libaquery.h | 1 + server/monetdb_conn.cpp | 84 +++++++++++++++++++++ server/monetdb_ext.c | 2 +- server/server.cpp | 125 ++++---------------------------- server/threading.cpp | 32 ++++++++ server/threading.h | 18 +++++ server/vector_type.hpp | 18 ++++- tests/rf.a | 16 ++++ 27 files changed, 396 insertions(+), 141 deletions(-) create mode 100644 demo/Makefile create mode 100644 demo/action.cpp create mode 100644 demo/democa.aqp create mode 100644 demo/democq.aqp create mode 100644 demo/demoi.aqp create mode 100644 demo/prep.a create mode 100644 demo/putdata.cpp create mode 100644 demo/query.cpp create mode 100644 demo/test.a create mode 100644 tests/rf.a diff --git a/Makefile b/Makefile index 98987fa..051493a 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ Defines = CC = $(CXX) -xc CXXFLAGS = --std=c++2a ifeq ($(AQ_DEBUG), 1) - OPTFLAGS = -g3 #-fsanitize=address + OPTFLAGS = -g3 #-static-libasan -fsanitize=address LINKFLAGS = else OPTFLAGS = -Ofast -DNDEBUG -fno-stack-protector diff --git a/demo/Makefile b/demo/Makefile new file mode 100644 index 0000000..e077b6f --- /dev/null +++ b/demo/Makefile @@ -0,0 +1,9 @@ +all: + $(CXX) -include ../server/pch.hpp putdata.cpp ../libaquery.a -shared -fPIC --std=c++2a -Ofast -DNDEBUG -fno-stack-protector -march=native -DTHREADING -D__AQUERY_ITC_USE_SEMPH__ -I/usr/local/opt/monetdb/include/monetdb -flto -s -fno-semantic-interposition -L/usr/local/opt/monetdb/lib -lmonetdbe -lmonetdbsql -lbat -o ../procedures/demoi0.so + $(CXX) -include ../server/pch.hpp action.cpp ../libaquery.a -shared -fPIC --std=c++2a -Ofast -DNDEBUG -fno-stack-protector -march=native -DTHREADING -D__AQUERY_ITC_USE_SEMPH__ -I/usr/local/opt/monetdb/include/monetdb -flto -s -fno-semantic-interposition -L/usr/local/opt/monetdb/lib -lmonetdbe -lmonetdbsql -lbat -o ../procedures/democa0.so + $(CXX) -include ../server/pch.hpp query.cpp ../libaquery.a -shared -fPIC --std=c++2a -Ofast -DNDEBUG -fno-stack-protector -march=native -DTHREADING -D__AQUERY_ITC_USE_SEMPH__ -I/usr/local/opt/monetdb/include/monetdb -flto -s -fno-semantic-interposition -L/usr/local/opt/monetdb/lib -lmonetdbe -lmonetdbsql -lbat -o ../procedures/democq0.so + +dbg: + $(CXX) -include ../server/pch.hpp putdata.cpp -g3 -march=native ../libaquery.a -shared -fPIC --std=c++2a -D_DEBUG -DTHREADING -D__AQUERY_ITC_USE_SEMPH__ -I/usr/local/opt/monetdb/include/monetdb -L/usr/local/opt/monetdb/lib -lmonetdbe -lmonetdbsql -lbat -o ../procedures/demoi0.so + $(CXX) -include ../server/pch.hpp action.cpp -g3 -march=native ../libaquery.a -shared -fPIC --std=c++2a -D_DEBUG -DTHREADING -D__AQUERY_ITC_USE_SEMPH__ -I/usr/local/opt/monetdb/include/monetdb -L/usr/local/opt/monetdb/lib -lmonetdbe -lmonetdbsql -lbat -o ../procedures/democa0.so + $(CXX) -include ../server/pch.hpp query.cpp -g3 -march=native ../libaquery.a -shared -fPIC --std=c++2a -D_DEBUG -DTHREADING -D__AQUERY_ITC_USE_SEMPH__ -I/usr/local/opt/monetdb/include/monetdb -L/usr/local/opt/monetdb/lib -lmonetdbe -lmonetdbsql -lbat -o ../procedures/democq0.so \ No newline at end of file diff --git a/demo/action.cpp b/demo/action.cpp new file mode 100644 index 0000000..a45db2b --- /dev/null +++ b/demo/action.cpp @@ -0,0 +1,30 @@ +#include "../server/libaquery.h" + +#ifndef __AQ_USE_THREADEDGC__ + +#include "../server/gc.h" +__AQEXPORT__(void) __AQ_Init_GC__(Context* cxt) { + GC::gc_handle = static_cast(cxt->gc); + GC::scratch_space = nullptr; +} + +#else // __AQ_USE_THREADEDGC__ +#define __AQ_Init_GC__(x) +#endif // __AQ_USE_THREADEDGC__ +bool (*fit_inc)(vector_type> X, vector_type y) = nullptr; + +#include "../server/monetdb_conn.h" +__AQEXPORT__(int) action(Context* cxt) { + using namespace std; + using namespace types; + if (fit_inc == nullptr) + fit_inc = (decltype(fit_inc))(cxt->get_module_function("fit_inc")); + + auto server = static_cast(cxt->alt_server); + auto len = uint32_t(monetdbe_get_size(*((void**)server->server), "source")); + auto x_1bN = ColRef>(len, monetdbe_get_col(*((void**)(server->server)), "source", 0)); + auto y_6uX = ColRef(len, monetdbe_get_col(*((void**)(server->server)), "source", 1)); + fit_inc(x_1bN, y_6uX); + puts("action done."); + return 0; +} diff --git a/demo/democa.aqp b/demo/democa.aqp new file mode 100644 index 0000000000000000000000000000000000000000..28c0bb66e8be05ecde060035d27c3283f3788c68 GIT binary patch literal 18 XcmZQ#U|?VbVn2p}#N?99{5%E#5ZVJE literal 0 HcmV?d00001 diff --git a/demo/democq.aqp b/demo/democq.aqp new file mode 100644 index 0000000000000000000000000000000000000000..5ad0955e0e4ffc382f83d91f2384144501f7f2be GIT binary patch literal 17 WcmZQ#U|?VbVn2p}!qU{DN(KN8`2yGg literal 0 HcmV?d00001 diff --git a/demo/demoi.aqp b/demo/demoi.aqp new file mode 100644 index 0000000000000000000000000000000000000000..c36cf185e291d91d9fe99bb8a15ada4d9e6ae48a GIT binary patch literal 14 TcmZQ#U|?VbVn2p}oD>EC1yliY literal 0 HcmV?d00001 diff --git a/demo/prep.a b/demo/prep.a new file mode 100644 index 0000000..ccaf7f6 --- /dev/null +++ b/demo/prep.a @@ -0,0 +1,13 @@ +create table source(x vecdouble, y int64); +LOAD MODULE FROM "./libirf.so" FUNCTIONS ( + newtree(height:int, f:int64, sparse:vecint, forget:double, maxf:int64, noclasses:int64, e:int) -> bool, + fit_inc(X:vecvecdouble, y:vecint64) -> bool, + predict(X:vecvecdouble) -> vecint +); + +create table elec_sparse(v int); + +insert into elec_sparse values (0), (1), (1), (1), (1), (1), (1); + +select newtree(30, 7, elec_sparse.v, 0, 4, 2, 1) from elec_sparse + diff --git a/demo/putdata.cpp b/demo/putdata.cpp new file mode 100644 index 0000000..27403f0 --- /dev/null +++ b/demo/putdata.cpp @@ -0,0 +1,47 @@ +#include "../server/libaquery.h" + +#ifndef __AQ_USE_THREADEDGC__ + +#include "../server/gc.h" +__AQEXPORT__(void) __AQ_Init_GC__(Context* cxt) { + GC::gc_handle = static_cast(cxt->gc); + GC::scratch_space = nullptr; +} + +#else // __AQ_USE_THREADEDGC__ +#define __AQ_Init_GC__(x) +#endif // __AQ_USE_THREADEDGC__ +#include "../server/monetdb_conn.h" +#include "../csv.h" + +__AQEXPORT__(int) ld(Context* cxt) { + using namespace std; + using namespace types; + static int cnt = 0; + if (cnt > 700) + return 1; + else + ++cnt; + char data_name[] = "data/electricity/electricity "; + auto server = static_cast(cxt->alt_server); + const char* names_fZrv[] = {"x", "y"}; + auto tbl_6erF = new TableInfo,int64_t>("source", names_fZrv); + decltype(auto) c_31ju0e = tbl_6erF->get_col<0>(); + decltype(auto) c_4VlzrR = tbl_6erF->get_col<1>(); + c_31ju0e.init("x"); + c_4VlzrR.init("y"); + auto nxt = to_text(data_name + 28, cnt); + memcpy(nxt, ".csv", 5); + puts(data_name); + AQCSVReader<2, ',', ';'> csv_reader_7g0GY7(data_name); + csv_reader_7g0GY7.next_line(); + vector_type tmp_5XMNcBz5; + int64_t tmp_5dAHIJ1d; + while(csv_reader_7g0GY7.read_row(tmp_5XMNcBz5,tmp_5dAHIJ1d)) { + c_31ju0e.emplace_back(tmp_5XMNcBz5); + c_4VlzrR.emplace_back(tmp_5dAHIJ1d); + } + tbl_6erF->monetdb_append_table(cxt->alt_server, "source"); + return 0; +} + diff --git a/demo/query.cpp b/demo/query.cpp new file mode 100644 index 0000000..bc91ba5 --- /dev/null +++ b/demo/query.cpp @@ -0,0 +1,30 @@ +#include "../server/libaquery.h" + +#ifndef __AQ_USE_THREADEDGC__ + +#include "../server/gc.h" +__AQEXPORT__(void) __AQ_Init_GC__(Context* cxt) { + GC::gc_handle = static_cast(cxt->gc); + GC::scratch_space = nullptr; +} + +#else // __AQ_USE_THREADEDGC__ +#define __AQ_Init_GC__(x) +#endif // __AQ_USE_THREADEDGC__ + +#include "../server/monetdb_conn.h" +__AQEXPORT__(int) query(Context* cxt) { + using namespace std; + using namespace types; + auto server = static_cast(cxt->alt_server); + static uint32_t old_sz = 0; + constexpr static uint32_t min_delta = 200; + auto newsz = monetdbe_get_size(*(void**) server->server, "source"); + if (newsz > old_sz + min_delta) { + puts("query true."); + old_sz = uint32_t(newsz); + return 1; + } + puts("query false."); + return 0; +} diff --git a/demo/test.a b/demo/test.a new file mode 100644 index 0000000..7301977 --- /dev/null +++ b/demo/test.a @@ -0,0 +1,3 @@ +create table test(x vecdouble, y int64); +load complex data infile "data/electricity/electricity872.csv" into table test fields terminated by ',' element terminated by ';'; +select predict(x) from test diff --git a/proctool.py b/proctool.py index 4757a85..b9e2f1b 100644 --- a/proctool.py +++ b/proctool.py @@ -24,6 +24,7 @@ def write(): fp.write(q.encode('utf-8')) if q.startswith('Q'): fp.write(b'\n ') + fp.write(b'\x00') diff --git a/reconstruct/ast.py b/reconstruct/ast.py index 97407c2..7f291ef 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -1118,6 +1118,11 @@ class create_trigger(ast_node): if self.procedure and self.table_name in self.context.tables_byname: self.table = self.context.tables_byname[self.table_name] self.table.triggers.add(self) + send_to_server( + f'TC{self.trigger_name}\0{self.table_name}\0' + f'{self.query_name}\0{self.action_name}' + ) + else: return self.context.triggers[self.trigger_name] = self @@ -1131,7 +1136,7 @@ class create_trigger(ast_node): def execute(self): from engine.utils import send_to_server - send_to_server(f'TC{self.query_name}\0{self.action_name}') + send_to_server(f'TA{self.query_name}\0{self.action_name}') def remove(self): from engine.utils import send_to_server diff --git a/rfdata_preproc.py b/rfdata_preproc.py index 6a7b760..ea30dbb 100644 --- a/rfdata_preproc.py +++ b/rfdata_preproc.py @@ -3,7 +3,7 @@ import os sep = os.sep # toggles -dataset = 'power' # [covtype, electricity, mixed, phishing, power] +dataset = 'electricity' # [covtype, electricity, mixed, phishing, power] use_threadpool = False # True # environments diff --git a/sdk/Evaluation.cpp b/sdk/Evaluation.cpp index cf6f5d8..f4b0d76 100644 --- a/sdk/Evaluation.cpp +++ b/sdk/Evaluation.cpp @@ -5,7 +5,7 @@ struct minEval{ double value; - int* values; + int* values = nullptr; double eval; long left; // how many on its left diff --git a/sdk/Makefile b/sdk/Makefile index c2178cf..652c39b 100644 --- a/sdk/Makefile +++ b/sdk/Makefile @@ -1,11 +1,12 @@ OPT_FLASG = -ifneq ($(DEBUG), 1) +ifneq ($(AQ_DEBUG), 1) OPT_FLAGS = -Ofast -march=native -flto -DNDEBUG else - OPT_FLAGS = -g3 -D_DEBUG -fsanitize=leak -fsanitize=address + OPT_FLAGS = -shared-libasan -g3 -D_DEBUG #-fsanitize=address endif example: $(CXX) -shared -fPIC example.cpp aquery_mem.cpp -fno-semantic-interposition -Ofast -march=native -flto --std=c++1z -L.. -laquery -o ../test.so irf: + rm ../libirf.so ; \ $(CXX) -shared -fPIC RF.cpp irf.cpp incrementalDecisionTree.cpp aquery_mem.cpp Evaluation.cpp -fno-semantic-interposition $(OPT_FLAGS) --std=c++1z -o ../libirf.so all: example diff --git a/sdk/aquery.h b/sdk/aquery.h index fc3f09e..787eb64 100644 --- a/sdk/aquery.h +++ b/sdk/aquery.h @@ -28,20 +28,33 @@ struct Session{ void* memory_map; }; -struct Context{ +struct Trigger; +struct IntervalBasedTriggerHost; +struct CallbackBasedTriggerHost; + +struct Context { typedef int (*printf_type) (const char *format, ...); - void* module_function_maps = 0; + + void* module_function_maps = nullptr; Config* cfg; int n_buffers, *sz_bufs; void **buffers; - void* alt_server; + void* alt_server = nullptr; Log_level log_level = LOG_INFO; Session current; - - + const char* aquery_root_path; +#ifdef THREADING + void* thread_pool; +#endif +#ifndef __AQ_USE_THREADEDGC__ + void* gc; +#endif + printf_type print; + Context(); + virtual ~Context(); template void log(Types... args) { if (log_level == LOG_INFO) diff --git a/sdk/incrementalDecisionTree.cpp b/sdk/incrementalDecisionTree.cpp index d21d408..1818e6b 100644 --- a/sdk/incrementalDecisionTree.cpp +++ b/sdk/incrementalDecisionTree.cpp @@ -14,7 +14,7 @@ std::mt19937 g(rd()); struct minEval{ double value; - int* values; + int* values = nullptr; double eval; long left; // how many on its left @@ -825,7 +825,10 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT* for(i=low;iresult = std::distance(t, std::max_element(t, t+classes)); free(index); free(current->dataRecord); @@ -989,13 +992,17 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){ } if(c.eval(cxt->gc); GC::scratch_space = nullptr; @@ -16,18 +19,18 @@ DecisionTree *dt = nullptr; RandomForest *rf = nullptr; __AQEXPORT__(bool) -newtree(int height, long f, ColRef X, double forget, long maxf, long noclasses, Evaluation e, long r, long rb) +newtree(int ntree, long f, ColRef sparse, double forget, long maxf, long nclasses, Evaluation e) { - if (X.size != f) + if (sparse.size != f) return false; int *X_cpy = (int *)malloc(f * sizeof(int)); - memcpy(X_cpy, X.container, f); + memcpy(X_cpy, sparse.container, f); if (maxf < 0) maxf = f; - dt = new DecisionTree(f, X_cpy, forget, maxf, noclasses, e); - rf = new RandomForest(height, f, X_cpy, forget, noclasses, e); + // dt = new DecisionTree(f, X_cpy, forget, maxf, noclasses, e); + rf = new RandomForest(ntree, f, X_cpy, forget, nclasses, e, true); return true; } @@ -53,14 +56,20 @@ __AQEXPORT__(bool) fit_inc(vector_type> v, vector_type res) { static uint32_t last_offset = 0; - double **data = (double **)malloc(v.size * sizeof(double *)); - if(last_offset >= v.size) + if(last_offset > v.size) last_offset = 0; - for (int i = last_offset; i < v.size; ++i) - data[i] = v.container[i].container; - rf->fit(data, res.container, v.size); - free(data); - return true; + const auto curr_size = (v.size - last_offset); + if(curr_size > 0) { + double **data = (double **)malloc( curr_size * sizeof(double *)); + for (uint32_t i = last_offset; i < v.size; ++i) + data[i - last_offset] = v.container[i].container; + rf->fit(data, res.container + last_offset, curr_size); + + last_offset = v.size; + free(data); + return true; + } + return false; } diff --git a/server/libaquery.cpp b/server/libaquery.cpp index f2bb47c..7f4596c 100644 --- a/server/libaquery.cpp +++ b/server/libaquery.cpp @@ -620,3 +620,22 @@ vector_type::vector_type(const uint32_t size, void* data) : } //std::cout<ct_host->execute_trigger(query, action); + } +} diff --git a/server/libaquery.h b/server/libaquery.h index 62fd4b1..6f43005 100644 --- a/server/libaquery.h +++ b/server/libaquery.h @@ -291,5 +291,6 @@ inline _This_Type* AQ_DupObject(_This_Type* __val) { #endif //__USE_STD_SEMAPHORE__ void print_monetdb_results(void* _srv, const char* sep, const char* end, uint32_t limit); +void activate_callback_based_trigger(Context* context, const char* cmd); #endif diff --git a/server/monetdb_conn.cpp b/server/monetdb_conn.cpp index b90e88a..eacdaa7 100644 --- a/server/monetdb_conn.cpp +++ b/server/monetdb_conn.cpp @@ -277,6 +277,84 @@ bool Server::havehge() { #endif } +using prt_fn_t = char* (*)(void*, char*); +constexpr prt_fn_t monetdbe_prtfns[] = { + aq_to_chars, aq_to_chars, aq_to_chars, aq_to_chars, + aq_to_chars, +#if __SIZEOF_INT128__ + aq_to_chars<__int128_t>, +#endif + aq_to_chars, aq_to_chars, aq_to_chars, + aq_to_chars, aq_to_chars, + aq_to_chars, aq_to_chars, aq_to_chars, + + // should be last: + aq_to_chars +}; + + +constexpr uint32_t output_buffer_size = 65536; +void print_monetdb_results(void* _srv, const char* sep = " ", const char* end = "\n", + uint32_t limit = std::numeric_limits::max()) { + auto srv = static_cast(_srv); + if (!srv->haserror() && srv->cnt && limit) { + char buffer[output_buffer_size]; + auto _res = static_cast (srv->res); + const auto ncols = _res->ncols; + monetdbe_column** cols = static_cast(malloc(sizeof(monetdbe_column*) * ncols)); + prt_fn_t *prtfns = (prt_fn_t*) alloca(sizeof(prt_fn_t) * ncols); + char** col_data = static_cast (alloca(sizeof(char*) * ncols)); + uint8_t* szs = static_cast(alloca(ncols)); + std::string header_string = ""; + const char* err_msg = nullptr; + const size_t l_sep = strlen(sep); + const size_t l_end = strlen(end); + char* _buffer = buffer; + const auto cnt = srv->cnt < limit? srv->cnt : limit; + + for(uint32_t i = 0; i < ncols; ++i){ + err_msg = monetdbe_result_fetch(_res, &cols[i], i); + if(err_msg) { goto cleanup; } + col_data[i] = static_cast(cols[i]->data); + prtfns[i] = monetdbe_prtfns[cols[i]->type]; + szs [i] = monetdbe_type_szs[cols[i]->type]; + header_string = header_string + cols[i]->name + sep + '|' + sep; + } + + if(l_sep > 512 || l_end > 512) { + puts("Error: separator or end string too long"); + goto cleanup; + } + if (header_string.size() >= l_sep + 1) + header_string.resize(header_string.size() - l_sep - 1); + header_string += end + std::string(header_string.size(), '=') + end; + fputs(header_string.c_str(), stdout); + for(uint64_t i = 0; i < cnt; ++i){ + for(uint32_t j = 0; j < ncols; ++j){ + //copy the field to buf + _buffer = prtfns[j](col_data[j], _buffer); + if (j != ncols - 1){ + memcpy(_buffer, sep, l_sep); + _buffer += l_sep; + } + col_data[j] += szs[j]; + } + memcpy(_buffer, end, l_end); + _buffer += l_end; + if(output_buffer_size - (_buffer - buffer) <= 1024){ + fwrite(buffer, 1, _buffer - buffer, stdout); + _buffer = buffer; + } + } + memcpy(_buffer, end, l_end); + _buffer += l_end; + if (_buffer != buffer) + fwrite(buffer, 1, _buffer - buffer, stdout); +cleanup: + free(cols); + } +} + int ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){ auto server = static_cast(cxt->alt_server); @@ -302,6 +380,12 @@ int ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){ handle = p->__rt_loaded_modules[procedure_module_cursor++]; } break; + case 'T' : { + if (p->queries[i][1] == 'N') { + cxt->ct_host->execute_trigger(p->queries[i] + 2, cxt); + } + } + break; case 'O': { uint32_t limit; memcpy(&limit, p->queries[i] + 1, sizeof(uint32_t)); diff --git a/server/monetdb_ext.c b/server/monetdb_ext.c index 06960c1..9f6ce9b 100644 --- a/server/monetdb_ext.c +++ b/server/monetdb_ext.c @@ -86,7 +86,7 @@ monetdbe_get_col(monetdbe_database dbhdl, const char *table_name, uint32_t col_i mvc *m = be->mvc; //mvc_trans(m); sql_table *t = find_table_or_view_on_scope(m, NULL, "sys", table_name, "CATALOG", false); - if (!t) return 0; + if (!t) return NULL; sql_column *col = ol_fetch(t->columns, col_id); sqlstore* store = m->store; BAT *b = store->storage_api.bind_col(m->session->tr, col, QUICK); diff --git a/server/server.cpp b/server/server.cpp index 62da014..2439b21 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -94,7 +94,9 @@ have_hge() { return false; #endif } + Context* _g_cxt; + StoredProcedure get_procedure(Context* cxt, const char* name) { auto res = cxt->stored_proc.find(name); @@ -107,11 +109,11 @@ get_procedure(Context* cxt, const char* name) { }; return res->second; } + __AQEXPORT__(StoredProcedure) get_procedure_ex(const char* name){ return get_procedure(_g_cxt, name); } -using prt_fn_t = char* (*)(void*, char*); // This function contains heap allocations, free after use template @@ -128,19 +130,6 @@ char* copy_lpstr(const char* str){ return ret; } -constexpr prt_fn_t monetdbe_prtfns[] = { - aq_to_chars, aq_to_chars, aq_to_chars, aq_to_chars, - aq_to_chars, -#if __SIZEOF_INT128__ - aq_to_chars<__int128_t>, -#endif - aq_to_chars, aq_to_chars, aq_to_chars, - aq_to_chars, aq_to_chars, - aq_to_chars, aq_to_chars, aq_to_chars, - - // should be last: - aq_to_chars -}; #ifndef __AQ_USE_THREADEDGC__ void aq_init_gc(void *handle, Context* cxt) @@ -157,87 +146,6 @@ void aq_init_gc(void *handle, Context* cxt) #define aq_init_gc(h, c) #endif //__AQ_USE_THREADEDGC__ -#include "monetdbe.h" -#undef max -#undef min -inline constexpr static unsigned char monetdbe_type_szs[] = { - sizeof(monetdbe_column_bool::null_value), sizeof(monetdbe_column_int8_t::null_value), - sizeof(monetdbe_column_int16_t::null_value), sizeof(monetdbe_column_int32_t::null_value), - sizeof(monetdbe_column_int64_t::null_value), -#ifdef __SIZEOF_INT128__ - sizeof(monetdbe_column_int128_t::null_value), -#endif - sizeof(monetdbe_column_size_t::null_value), sizeof(monetdbe_column_float::null_value), - sizeof(monetdbe_column_double::null_value), - sizeof(monetdbe_column_str::null_value), sizeof(monetdbe_column_blob::null_value), - sizeof(monetdbe_data_date), sizeof(monetdbe_data_time), sizeof(monetdbe_data_timestamp), - - // should be last: - 1 -}; -constexpr uint32_t output_buffer_size = 65536; -void print_monetdb_results(void* _srv, const char* sep = " ", const char* end = "\n", - uint32_t limit = std::numeric_limits::max()) { - auto srv = static_cast(_srv); - if (!srv->haserror() && srv->cnt && limit) { - char buffer[output_buffer_size]; - auto _res = static_cast (srv->res); - const auto ncols = _res->ncols; - monetdbe_column** cols = static_cast(malloc(sizeof(monetdbe_column*) * ncols)); - prt_fn_t *prtfns = (prt_fn_t*) alloca(sizeof(prt_fn_t) * ncols); - char** col_data = static_cast (alloca(sizeof(char*) * ncols)); - uint8_t* szs = static_cast(alloca(ncols)); - std::string header_string = ""; - const char* err_msg = nullptr; - const size_t l_sep = strlen(sep); - const size_t l_end = strlen(end); - char* _buffer = buffer; - const auto cnt = srv->cnt < limit? srv->cnt : limit; - - for(uint32_t i = 0; i < ncols; ++i){ - err_msg = monetdbe_result_fetch(_res, &cols[i], i); - if(err_msg) { goto cleanup; } - col_data[i] = static_cast(cols[i]->data); - prtfns[i] = monetdbe_prtfns[cols[i]->type]; - szs [i] = monetdbe_type_szs[cols[i]->type]; - header_string = header_string + cols[i]->name + sep + '|' + sep; - } - - if(l_sep > 512 || l_end > 512) { - puts("Error: separator or end string too long"); - goto cleanup; - } - if (header_string.size() >= l_sep + 1) - header_string.resize(header_string.size() - l_sep - 1); - header_string += end + std::string(header_string.size(), '=') + end; - fputs(header_string.c_str(), stdout); - for(uint64_t i = 0; i < cnt; ++i){ - for(uint32_t j = 0; j < ncols; ++j){ - //copy the field to buf - _buffer = prtfns[j](col_data[j], _buffer); - if (j != ncols - 1){ - memcpy(_buffer, sep, l_sep); - _buffer += l_sep; - } - col_data[j] += szs[j]; - } - memcpy(_buffer, end, l_end); - _buffer += l_end; - if(output_buffer_size - (_buffer - buffer) <= 1024){ - fwrite(buffer, 1, _buffer - buffer, stdout); - _buffer = buffer; - } - } - memcpy(_buffer, end, l_end); - _buffer += l_end; - if (_buffer != buffer) - fwrite(buffer, 1, _buffer - buffer, stdout); -cleanup: - free(cols); - } -} - - void initialize_module(const char* module_name, void* module_handle, Context* cxt){ auto _init_module = reinterpret_cast(dlsym(module_handle, "init_session")); if (_init_module) { @@ -581,25 +489,24 @@ start: } } break; - case 'C': // activate callback based trigger + case 'C' : //register callback based trigger { - const char* query_name = n_recvd[i] + 2; + const char* trigger_name = n_recvd[i] + 2; + const char* table_name = trigger_name; + while(*table_name++); + const char* query_name = table_name; + while(*query_name++); const char* action_name = query_name; while(*action_name++); - if(auto q = get_procedure(cxt, query_name), - a = get_procedure(cxt, action_name); - q.name == nullptr || a.name == nullptr - ) - printf("Warning: Invalid query or action name: %s %s", - query_name, action_name); - else{ - auto query = AQ_DupObject(&q); - auto action = AQ_DupObject(&a); - - cxt->ct_host->execute_trigger(query, action); - } + cxt->ct_host->add_trigger(trigger_name, table_name, query_name, action_name); } break; + case 'A': // activate callback based trigger + activate_callback_based_trigger(cxt, n_recvd[i]); + break; + case 'N': + cxt->ct_host->execute_trigger(n_recvd[i] + 2); + break; case 'R': // remove trigger { cxt->it_host->remove_trigger(n_recvd[i] + 2); diff --git a/server/threading.cpp b/server/threading.cpp index 121b530..81f1436 100644 --- a/server/threading.cpp +++ b/server/threading.cpp @@ -222,6 +222,7 @@ bool IntervalBasedTrigger::tick(uint32_t delta_t) { CallbackBasedTriggerHost::CallbackBasedTriggerHost(ThreadPool *tp, Context *cxt) { this->tp = tp; this->cxt = cxt; + this->triggers = new aq_map; } void CallbackBasedTriggerHost::execute_trigger(StoredProcedure* query, StoredProcedure* action) { @@ -234,4 +235,35 @@ void CallbackBasedTriggerHost::execute_trigger(StoredProcedure* query, StoredPro this->tp->enqueue_task(payload); } +void execute_trigger(const char* trigger_name) { + auto vt_triggers = static_cast *>(this->triggers); + auto ptr = vt_triggers->find(trigger_name); + if (ptr != vt_triggers->end()) { + auto& tr = ptr->second; + if (!tr.materialized) { + tr.query = new CallbackBasedTrigger(get_procedure(cxt, tr.query_name)); + tr.action = new CallbackBasedTrigger(get_procedure(cxt, tr.action_name)); + tr.materialized = true; + } + this->execute_trigger(AQ_DupObject(tr.query), AQ_DupObject(tr.action)); + } +} + +void CallbackBasedTriggerHost::add_trigger( + const char* trigger_name, + const char* table_name, + const char* query_name, + const char* action_name +) { + auto vt_triggers = static_cast *>(this->triggers); + auto tr = CallbackBasedTrigger { + .trigger_name = trigger_name, + .table_name = table_name, + .query_name = query_name, + .action_name = action_name, + .materialized = false + }; + vt_triggers->emplace(trigger_name, tr); +} + void CallbackBasedTriggerHost::tick() {} diff --git a/server/threading.h b/server/threading.h index 01d02d7..5211a5b 100644 --- a/server/threading.h +++ b/server/threading.h @@ -80,10 +80,28 @@ private: void tick() override; }; +struct CallbackBasedTrigger : Trigger { + const char* trigger_name; + const char* table_name; + union { + StoredProcedure* query; + const char* query_name; + }; + union { + StoredProcedure* action; + const char* action_name; + }; + bool materialized; +}; + class CallbackBasedTriggerHost : public TriggerHost { public: explicit CallbackBasedTriggerHost(ThreadPool *tp, Context *cxt); void execute_trigger(StoredProcedure* query, StoredProcedure* action); + void execute_trigger(const char* trigger_name); + void add_trigger(const char* trigger_name, const char* table_name, + const char* query_name, const char* action_name); + private: void tick() override; }; diff --git a/server/vector_type.hpp b/server/vector_type.hpp index 7722a64..9775b2a 100644 --- a/server/vector_type.hpp +++ b/server/vector_type.hpp @@ -172,15 +172,25 @@ public: inline void grow(uint32_t sz = 0) { if constexpr (_grow) sz = this->size; - if (sz >= capacity) { // geometric growth + if (sz >= capacity) { // geometric growth + bool reallocate = true; + if (capacity == 0 && sz > 0) + [[unlikely]] + reallocate = false; uint32_t new_capacity; if constexpr (_grow) new_capacity = size + 1 + (size >> 1); else new_capacity = sz; - - _Ty* n_container = (_Ty*)realloc(container, new_capacity * sizeof(_Ty)); - // memcpy(n_container, container, sizeof(_Ty) * size); + + _Ty* n_container; + if (reallocate) { + n_container = (_Ty*)realloc(container, new_capacity * sizeof(_Ty)); + } + else { + n_container = (_Ty*)malloc(new_capacity * sizeof(_Ty)); + memcpy(n_container, container, sizeof(_Ty) * size); + } memset(n_container + size, 0, sizeof(_Ty) * (new_capacity - size)); // if (capacity) // free(container); diff --git a/tests/rf.a b/tests/rf.a new file mode 100644 index 0000000..00608ef --- /dev/null +++ b/tests/rf.a @@ -0,0 +1,16 @@ +LOAD MODULE FROM "./libirf.so" FUNCTIONS ( newtree(height:int, f:int64, sparse:vecint, forget:double, maxf:int64, noclasses:int64, e:int) -> bool, fit(X:vecvecdouble, y:vecint64) -> bool, predict(X:vecvecdouble) -> vecint ); + +create table source(x vecdouble, y int64); +-- Create trigger 1 ~~ to predict whenever sz(source > ?) +-- Create trigger 2 ~~ to auto feed ~ +load complex data infile "data/electricity/electricity1.csv" into table source fields terminated by ',' element terminated by ';'; + +create table elec_sparse(v int); +insert into elec_sparse values (0), (1), (1), (1), (1), (1), (1); + +select newtree(30, 7, elec_sparse.v, 0, 4, 2, 1) from elec_sparse + +select fit(x, y) from source + +-- select pack(x1, x2, x3, x4) from source +select predict(x) from source \ No newline at end of file