trigger demo

master
Bill 2 years ago
parent 4333af07f2
commit 05cca378e0

@ -5,7 +5,7 @@ Defines =
CC = $(CXX) -xc
CXXFLAGS = --std=c++2a
ifeq ($(AQ_DEBUG), 1)
OPTFLAGS = -g3 #-fsanitize=address
OPTFLAGS = -g3 #-static-libasan -fsanitize=address
LINKFLAGS =
else
OPTFLAGS = -Ofast -DNDEBUG -fno-stack-protector

@ -0,0 +1,9 @@
all:
$(CXX) -include ../server/pch.hpp putdata.cpp ../libaquery.a -shared -fPIC --std=c++2a -Ofast -DNDEBUG -fno-stack-protector -march=native -DTHREADING -D__AQUERY_ITC_USE_SEMPH__ -I/usr/local/opt/monetdb/include/monetdb -flto -s -fno-semantic-interposition -L/usr/local/opt/monetdb/lib -lmonetdbe -lmonetdbsql -lbat -o ../procedures/demoi0.so
$(CXX) -include ../server/pch.hpp action.cpp ../libaquery.a -shared -fPIC --std=c++2a -Ofast -DNDEBUG -fno-stack-protector -march=native -DTHREADING -D__AQUERY_ITC_USE_SEMPH__ -I/usr/local/opt/monetdb/include/monetdb -flto -s -fno-semantic-interposition -L/usr/local/opt/monetdb/lib -lmonetdbe -lmonetdbsql -lbat -o ../procedures/democa0.so
$(CXX) -include ../server/pch.hpp query.cpp ../libaquery.a -shared -fPIC --std=c++2a -Ofast -DNDEBUG -fno-stack-protector -march=native -DTHREADING -D__AQUERY_ITC_USE_SEMPH__ -I/usr/local/opt/monetdb/include/monetdb -flto -s -fno-semantic-interposition -L/usr/local/opt/monetdb/lib -lmonetdbe -lmonetdbsql -lbat -o ../procedures/democq0.so
dbg:
$(CXX) -include ../server/pch.hpp putdata.cpp -g3 -march=native ../libaquery.a -shared -fPIC --std=c++2a -D_DEBUG -DTHREADING -D__AQUERY_ITC_USE_SEMPH__ -I/usr/local/opt/monetdb/include/monetdb -L/usr/local/opt/monetdb/lib -lmonetdbe -lmonetdbsql -lbat -o ../procedures/demoi0.so
$(CXX) -include ../server/pch.hpp action.cpp -g3 -march=native ../libaquery.a -shared -fPIC --std=c++2a -D_DEBUG -DTHREADING -D__AQUERY_ITC_USE_SEMPH__ -I/usr/local/opt/monetdb/include/monetdb -L/usr/local/opt/monetdb/lib -lmonetdbe -lmonetdbsql -lbat -o ../procedures/democa0.so
$(CXX) -include ../server/pch.hpp query.cpp -g3 -march=native ../libaquery.a -shared -fPIC --std=c++2a -D_DEBUG -DTHREADING -D__AQUERY_ITC_USE_SEMPH__ -I/usr/local/opt/monetdb/include/monetdb -L/usr/local/opt/monetdb/lib -lmonetdbe -lmonetdbsql -lbat -o ../procedures/democq0.so

@ -0,0 +1,30 @@
#include "../server/libaquery.h"
#ifndef __AQ_USE_THREADEDGC__
#include "../server/gc.h"
__AQEXPORT__(void) __AQ_Init_GC__(Context* cxt) {
GC::gc_handle = static_cast<GC*>(cxt->gc);
GC::scratch_space = nullptr;
}
#else // __AQ_USE_THREADEDGC__
#define __AQ_Init_GC__(x)
#endif // __AQ_USE_THREADEDGC__
bool (*fit_inc)(vector_type<vector_type<double>> X, vector_type<int64_t> y) = nullptr;
#include "../server/monetdb_conn.h"
__AQEXPORT__(int) action(Context* cxt) {
using namespace std;
using namespace types;
if (fit_inc == nullptr)
fit_inc = (decltype(fit_inc))(cxt->get_module_function("fit_inc"));
auto server = static_cast<Server*>(cxt->alt_server);
auto len = uint32_t(monetdbe_get_size(*((void**)server->server), "source"));
auto x_1bN = ColRef<vector_type<double>>(len, monetdbe_get_col(*((void**)(server->server)), "source", 0));
auto y_6uX = ColRef<int64_t>(len, monetdbe_get_col(*((void**)(server->server)), "source", 1));
fit_inc(x_1bN, y_6uX);
puts("action done.");
return 0;
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

@ -0,0 +1,13 @@
create table source(x vecdouble, y int64);
LOAD MODULE FROM "./libirf.so" FUNCTIONS (
newtree(height:int, f:int64, sparse:vecint, forget:double, maxf:int64, noclasses:int64, e:int) -> bool,
fit_inc(X:vecvecdouble, y:vecint64) -> bool,
predict(X:vecvecdouble) -> vecint
);
create table elec_sparse(v int);
insert into elec_sparse values (0), (1), (1), (1), (1), (1), (1);
select newtree(30, 7, elec_sparse.v, 0, 4, 2, 1) from elec_sparse

@ -0,0 +1,47 @@
#include "../server/libaquery.h"
#ifndef __AQ_USE_THREADEDGC__
#include "../server/gc.h"
__AQEXPORT__(void) __AQ_Init_GC__(Context* cxt) {
GC::gc_handle = static_cast<GC*>(cxt->gc);
GC::scratch_space = nullptr;
}
#else // __AQ_USE_THREADEDGC__
#define __AQ_Init_GC__(x)
#endif // __AQ_USE_THREADEDGC__
#include "../server/monetdb_conn.h"
#include "../csv.h"
__AQEXPORT__(int) ld(Context* cxt) {
using namespace std;
using namespace types;
static int cnt = 0;
if (cnt > 700)
return 1;
else
++cnt;
char data_name[] = "data/electricity/electricity ";
auto server = static_cast<Server*>(cxt->alt_server);
const char* names_fZrv[] = {"x", "y"};
auto tbl_6erF = new TableInfo<vector_type<double>,int64_t>("source", names_fZrv);
decltype(auto) c_31ju0e = tbl_6erF->get_col<0>();
decltype(auto) c_4VlzrR = tbl_6erF->get_col<1>();
c_31ju0e.init("x");
c_4VlzrR.init("y");
auto nxt = to_text(data_name + 28, cnt);
memcpy(nxt, ".csv", 5);
puts(data_name);
AQCSVReader<2, ',', ';'> csv_reader_7g0GY7(data_name);
csv_reader_7g0GY7.next_line();
vector_type<double> tmp_5XMNcBz5;
int64_t tmp_5dAHIJ1d;
while(csv_reader_7g0GY7.read_row(tmp_5XMNcBz5,tmp_5dAHIJ1d)) {
c_31ju0e.emplace_back(tmp_5XMNcBz5);
c_4VlzrR.emplace_back(tmp_5dAHIJ1d);
}
tbl_6erF->monetdb_append_table(cxt->alt_server, "source");
return 0;
}

@ -0,0 +1,30 @@
#include "../server/libaquery.h"
#ifndef __AQ_USE_THREADEDGC__
#include "../server/gc.h"
__AQEXPORT__(void) __AQ_Init_GC__(Context* cxt) {
GC::gc_handle = static_cast<GC*>(cxt->gc);
GC::scratch_space = nullptr;
}
#else // __AQ_USE_THREADEDGC__
#define __AQ_Init_GC__(x)
#endif // __AQ_USE_THREADEDGC__
#include "../server/monetdb_conn.h"
__AQEXPORT__(int) query(Context* cxt) {
using namespace std;
using namespace types;
auto server = static_cast<Server*>(cxt->alt_server);
static uint32_t old_sz = 0;
constexpr static uint32_t min_delta = 200;
auto newsz = monetdbe_get_size(*(void**) server->server, "source");
if (newsz > old_sz + min_delta) {
puts("query true.");
old_sz = uint32_t(newsz);
return 1;
}
puts("query false.");
return 0;
}

@ -0,0 +1,3 @@
create table test(x vecdouble, y int64);
load complex data infile "data/electricity/electricity872.csv" into table test fields terminated by ',' element terminated by ';';
select predict(x) from test

@ -24,6 +24,7 @@ def write():
fp.write(q.encode('utf-8'))
if q.startswith('Q'):
fp.write(b'\n ')
fp.write(b'\x00')

@ -1118,6 +1118,11 @@ class create_trigger(ast_node):
if self.procedure and self.table_name in self.context.tables_byname:
self.table = self.context.tables_byname[self.table_name]
self.table.triggers.add(self)
send_to_server(
f'TC{self.trigger_name}\0{self.table_name}\0'
f'{self.query_name}\0{self.action_name}'
)
else:
return
self.context.triggers[self.trigger_name] = self
@ -1131,7 +1136,7 @@ class create_trigger(ast_node):
def execute(self):
from engine.utils import send_to_server
send_to_server(f'TC{self.query_name}\0{self.action_name}')
send_to_server(f'TA{self.query_name}\0{self.action_name}')
def remove(self):
from engine.utils import send_to_server

@ -3,7 +3,7 @@ import os
sep = os.sep
# toggles
dataset = 'power' # [covtype, electricity, mixed, phishing, power]
dataset = 'electricity' # [covtype, electricity, mixed, phishing, power]
use_threadpool = False # True
# environments

@ -5,7 +5,7 @@
struct minEval{
double value;
int* values;
int* values = nullptr;
double eval;
long left; // how many on its left

@ -1,11 +1,12 @@
OPT_FLASG =
ifneq ($(DEBUG), 1)
ifneq ($(AQ_DEBUG), 1)
OPT_FLAGS = -Ofast -march=native -flto -DNDEBUG
else
OPT_FLAGS = -g3 -D_DEBUG -fsanitize=leak -fsanitize=address
OPT_FLAGS = -shared-libasan -g3 -D_DEBUG #-fsanitize=address
endif
example:
$(CXX) -shared -fPIC example.cpp aquery_mem.cpp -fno-semantic-interposition -Ofast -march=native -flto --std=c++1z -L.. -laquery -o ../test.so
irf:
rm ../libirf.so ; \
$(CXX) -shared -fPIC RF.cpp irf.cpp incrementalDecisionTree.cpp aquery_mem.cpp Evaluation.cpp -fno-semantic-interposition $(OPT_FLAGS) --std=c++1z -o ../libirf.so
all: example

@ -28,20 +28,33 @@ struct Session{
void* memory_map;
};
struct Context{
struct Trigger;
struct IntervalBasedTriggerHost;
struct CallbackBasedTriggerHost;
struct Context {
typedef int (*printf_type) (const char *format, ...);
void* module_function_maps = 0;
void* module_function_maps = nullptr;
Config* cfg;
int n_buffers, *sz_bufs;
void **buffers;
void* alt_server;
void* alt_server = nullptr;
Log_level log_level = LOG_INFO;
Session current;
const char* aquery_root_path;
#ifdef THREADING
void* thread_pool;
#endif
#ifndef __AQ_USE_THREADEDGC__
void* gc;
#endif
printf_type print;
Context();
virtual ~Context();
template <class ...Types>
void log(Types... args) {
if (log_level == LOG_INFO)

@ -14,7 +14,7 @@ std::mt19937 g(rd());
struct minEval{
double value;
int* values;
int* values = nullptr;
double eval;
long left; // how many on its left
@ -825,7 +825,10 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT*
for(i=low;i<low+size;i++){
t[resultNew[i]]++;
}
if(cMin.values!=nullptr)free(cMin.values);
if(cMin.values!=nullptr){
free(cMin.values);
cMin.values = nullptr;
}
current->result = std::distance(t, std::max_element(t, t+classes));
free(index);
free(current->dataRecord);
@ -989,13 +992,17 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){
}
if(c.eval<cMin.eval){
cMin.eval = c.eval;
if(cMin.values!=nullptr)free(cMin.values);
if(cMin.values!=nullptr){
free(cMin.values);
cMin.values = nullptr;
}
cMin.values = c.values;
cMin.value = c.value;
cFeature = col;
left = c.left;
}else if(c.values!=nullptr){
free(c.values);
c.values = nullptr;
}
}

@ -6,7 +6,10 @@
#include "aquery.h"
#include "./server/gc.h"
#include "../server/gc.h"
inline GC* GC::gc_handle = nullptr;
inline ScratchSpace* GC::scratch_space = nullptr;
__AQEXPORT__(void) __AQ_Init_GC__(Context* cxt) {
GC::gc_handle = static_cast<GC*>(cxt->gc);
GC::scratch_space = nullptr;
@ -16,18 +19,18 @@ DecisionTree *dt = nullptr;
RandomForest *rf = nullptr;
__AQEXPORT__(bool)
newtree(int height, long f, ColRef<int> X, double forget, long maxf, long noclasses, Evaluation e, long r, long rb)
newtree(int ntree, long f, ColRef<int> sparse, double forget, long maxf, long nclasses, Evaluation e)
{
if (X.size != f)
if (sparse.size != f)
return false;
int *X_cpy = (int *)malloc(f * sizeof(int));
memcpy(X_cpy, X.container, f);
memcpy(X_cpy, sparse.container, f);
if (maxf < 0)
maxf = f;
dt = new DecisionTree(f, X_cpy, forget, maxf, noclasses, e);
rf = new RandomForest(height, f, X_cpy, forget, noclasses, e);
// dt = new DecisionTree(f, X_cpy, forget, maxf, noclasses, e);
rf = new RandomForest(ntree, f, X_cpy, forget, nclasses, e, true);
return true;
}
@ -53,14 +56,20 @@ __AQEXPORT__(bool)
fit_inc(vector_type<vector_type<double>> v, vector_type<long> res)
{
static uint32_t last_offset = 0;
double **data = (double **)malloc(v.size * sizeof(double *));
if(last_offset >= v.size)
if(last_offset > v.size)
last_offset = 0;
for (int i = last_offset; i < v.size; ++i)
data[i] = v.container[i].container;
rf->fit(data, res.container, v.size);
free(data);
return true;
const auto curr_size = (v.size - last_offset);
if(curr_size > 0) {
double **data = (double **)malloc( curr_size * sizeof(double *));
for (uint32_t i = last_offset; i < v.size; ++i)
data[i - last_offset] = v.container[i].container;
rf->fit(data, res.container + last_offset, curr_size);
last_offset = v.size;
free(data);
return true;
}
return false;
}

@ -620,3 +620,22 @@ vector_type<std::string_view>::vector_type(const uint32_t size, void* data) :
}
//std::cout<<size << container[1];
}
void activate_callback_based_trigger(Context* context, const char* cmd)
{
const char* query_name = cmd + 2;
const char* action_name = query_name;
while (*action_name++);
if(auto q = get_procedure(cxt, query_name),
a = get_procedure(cxt, action_name);
q.name == nullptr || a.name == nullptr
)
printf("Warning: Invalid query or action name: %s %s",
query_name, action_name);
else {
auto query = AQ_DupObject(&q);
auto action = AQ_DupObject(&a);
cxt->ct_host->execute_trigger(query, action);
}
}

@ -291,5 +291,6 @@ inline _This_Type* AQ_DupObject(_This_Type* __val) {
#endif //__USE_STD_SEMAPHORE__
void print_monetdb_results(void* _srv, const char* sep, const char* end, uint32_t limit);
void activate_callback_based_trigger(Context* context, const char* cmd);
#endif

@ -277,6 +277,84 @@ bool Server::havehge() {
#endif
}
using prt_fn_t = char* (*)(void*, char*);
constexpr prt_fn_t monetdbe_prtfns[] = {
aq_to_chars<bool>, aq_to_chars<int8_t>, aq_to_chars<int16_t>, aq_to_chars<int32_t>,
aq_to_chars<int64_t>,
#if __SIZEOF_INT128__
aq_to_chars<__int128_t>,
#endif
aq_to_chars<size_t>, aq_to_chars<float>, aq_to_chars<double>,
aq_to_chars<char*>, aq_to_chars<std::nullptr_t>,
aq_to_chars<types::date_t>, aq_to_chars<types::time_t>, aq_to_chars<types::timestamp_t>,
// should be last:
aq_to_chars<std::nullptr_t>
};
constexpr uint32_t output_buffer_size = 65536;
void print_monetdb_results(void* _srv, const char* sep = " ", const char* end = "\n",
uint32_t limit = std::numeric_limits<uint32_t>::max()) {
auto srv = static_cast<Server *>(_srv);
if (!srv->haserror() && srv->cnt && limit) {
char buffer[output_buffer_size];
auto _res = static_cast<monetdbe_result*> (srv->res);
const auto ncols = _res->ncols;
monetdbe_column** cols = static_cast<monetdbe_column**>(malloc(sizeof(monetdbe_column*) * ncols));
prt_fn_t *prtfns = (prt_fn_t*) alloca(sizeof(prt_fn_t) * ncols);
char** col_data = static_cast<char**> (alloca(sizeof(char*) * ncols));
uint8_t* szs = static_cast<uint8_t*>(alloca(ncols));
std::string header_string = "";
const char* err_msg = nullptr;
const size_t l_sep = strlen(sep);
const size_t l_end = strlen(end);
char* _buffer = buffer;
const auto cnt = srv->cnt < limit? srv->cnt : limit;
for(uint32_t i = 0; i < ncols; ++i){
err_msg = monetdbe_result_fetch(_res, &cols[i], i);
if(err_msg) { goto cleanup; }
col_data[i] = static_cast<char *>(cols[i]->data);
prtfns[i] = monetdbe_prtfns[cols[i]->type];
szs [i] = monetdbe_type_szs[cols[i]->type];
header_string = header_string + cols[i]->name + sep + '|' + sep;
}
if(l_sep > 512 || l_end > 512) {
puts("Error: separator or end string too long");
goto cleanup;
}
if (header_string.size() >= l_sep + 1)
header_string.resize(header_string.size() - l_sep - 1);
header_string += end + std::string(header_string.size(), '=') + end;
fputs(header_string.c_str(), stdout);
for(uint64_t i = 0; i < cnt; ++i){
for(uint32_t j = 0; j < ncols; ++j){
//copy the field to buf
_buffer = prtfns[j](col_data[j], _buffer);
if (j != ncols - 1){
memcpy(_buffer, sep, l_sep);
_buffer += l_sep;
}
col_data[j] += szs[j];
}
memcpy(_buffer, end, l_end);
_buffer += l_end;
if(output_buffer_size - (_buffer - buffer) <= 1024){
fwrite(buffer, 1, _buffer - buffer, stdout);
_buffer = buffer;
}
}
memcpy(_buffer, end, l_end);
_buffer += l_end;
if (_buffer != buffer)
fwrite(buffer, 1, _buffer - buffer, stdout);
cleanup:
free(cols);
}
}
int ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){
auto server = static_cast<Server*>(cxt->alt_server);
@ -302,6 +380,12 @@ int ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){
handle = p->__rt_loaded_modules[procedure_module_cursor++];
}
break;
case 'T' : {
if (p->queries[i][1] == 'N') {
cxt->ct_host->execute_trigger(p->queries[i] + 2, cxt);
}
}
break;
case 'O': {
uint32_t limit;
memcpy(&limit, p->queries[i] + 1, sizeof(uint32_t));

@ -86,7 +86,7 @@ monetdbe_get_col(monetdbe_database dbhdl, const char *table_name, uint32_t col_i
mvc *m = be->mvc;
//mvc_trans(m);
sql_table *t = find_table_or_view_on_scope(m, NULL, "sys", table_name, "CATALOG", false);
if (!t) return 0;
if (!t) return NULL;
sql_column *col = ol_fetch(t->columns, col_id);
sqlstore* store = m->store;
BAT *b = store->storage_api.bind_col(m->session->tr, col, QUICK);

@ -94,7 +94,9 @@ have_hge() {
return false;
#endif
}
Context* _g_cxt;
StoredProcedure
get_procedure(Context* cxt, const char* name) {
auto res = cxt->stored_proc.find(name);
@ -107,11 +109,11 @@ get_procedure(Context* cxt, const char* name) {
};
return res->second;
}
__AQEXPORT__(StoredProcedure)
get_procedure_ex(const char* name){
return get_procedure(_g_cxt, name);
}
using prt_fn_t = char* (*)(void*, char*);
// This function contains heap allocations, free after use
template<class String_T>
@ -128,19 +130,6 @@ char* copy_lpstr(const char* str){
return ret;
}
constexpr prt_fn_t monetdbe_prtfns[] = {
aq_to_chars<bool>, aq_to_chars<int8_t>, aq_to_chars<int16_t>, aq_to_chars<int32_t>,
aq_to_chars<int64_t>,
#if __SIZEOF_INT128__
aq_to_chars<__int128_t>,
#endif
aq_to_chars<size_t>, aq_to_chars<float>, aq_to_chars<double>,
aq_to_chars<char*>, aq_to_chars<std::nullptr_t>,
aq_to_chars<types::date_t>, aq_to_chars<types::time_t>, aq_to_chars<types::timestamp_t>,
// should be last:
aq_to_chars<std::nullptr_t>
};
#ifndef __AQ_USE_THREADEDGC__
void aq_init_gc(void *handle, Context* cxt)
@ -157,87 +146,6 @@ void aq_init_gc(void *handle, Context* cxt)
#define aq_init_gc(h, c)
#endif //__AQ_USE_THREADEDGC__
#include "monetdbe.h"
#undef max
#undef min
inline constexpr static unsigned char monetdbe_type_szs[] = {
sizeof(monetdbe_column_bool::null_value), sizeof(monetdbe_column_int8_t::null_value),
sizeof(monetdbe_column_int16_t::null_value), sizeof(monetdbe_column_int32_t::null_value),
sizeof(monetdbe_column_int64_t::null_value),
#ifdef __SIZEOF_INT128__
sizeof(monetdbe_column_int128_t::null_value),
#endif
sizeof(monetdbe_column_size_t::null_value), sizeof(monetdbe_column_float::null_value),
sizeof(monetdbe_column_double::null_value),
sizeof(monetdbe_column_str::null_value), sizeof(monetdbe_column_blob::null_value),
sizeof(monetdbe_data_date), sizeof(monetdbe_data_time), sizeof(monetdbe_data_timestamp),
// should be last:
1
};
constexpr uint32_t output_buffer_size = 65536;
void print_monetdb_results(void* _srv, const char* sep = " ", const char* end = "\n",
uint32_t limit = std::numeric_limits<uint32_t>::max()) {
auto srv = static_cast<Server *>(_srv);
if (!srv->haserror() && srv->cnt && limit) {
char buffer[output_buffer_size];
auto _res = static_cast<monetdbe_result*> (srv->res);
const auto ncols = _res->ncols;
monetdbe_column** cols = static_cast<monetdbe_column**>(malloc(sizeof(monetdbe_column*) * ncols));
prt_fn_t *prtfns = (prt_fn_t*) alloca(sizeof(prt_fn_t) * ncols);
char** col_data = static_cast<char**> (alloca(sizeof(char*) * ncols));
uint8_t* szs = static_cast<uint8_t*>(alloca(ncols));
std::string header_string = "";
const char* err_msg = nullptr;
const size_t l_sep = strlen(sep);
const size_t l_end = strlen(end);
char* _buffer = buffer;
const auto cnt = srv->cnt < limit? srv->cnt : limit;
for(uint32_t i = 0; i < ncols; ++i){
err_msg = monetdbe_result_fetch(_res, &cols[i], i);
if(err_msg) { goto cleanup; }
col_data[i] = static_cast<char *>(cols[i]->data);
prtfns[i] = monetdbe_prtfns[cols[i]->type];
szs [i] = monetdbe_type_szs[cols[i]->type];
header_string = header_string + cols[i]->name + sep + '|' + sep;
}
if(l_sep > 512 || l_end > 512) {
puts("Error: separator or end string too long");
goto cleanup;
}
if (header_string.size() >= l_sep + 1)
header_string.resize(header_string.size() - l_sep - 1);
header_string += end + std::string(header_string.size(), '=') + end;
fputs(header_string.c_str(), stdout);
for(uint64_t i = 0; i < cnt; ++i){
for(uint32_t j = 0; j < ncols; ++j){
//copy the field to buf
_buffer = prtfns[j](col_data[j], _buffer);
if (j != ncols - 1){
memcpy(_buffer, sep, l_sep);
_buffer += l_sep;
}
col_data[j] += szs[j];
}
memcpy(_buffer, end, l_end);
_buffer += l_end;
if(output_buffer_size - (_buffer - buffer) <= 1024){
fwrite(buffer, 1, _buffer - buffer, stdout);
_buffer = buffer;
}
}
memcpy(_buffer, end, l_end);
_buffer += l_end;
if (_buffer != buffer)
fwrite(buffer, 1, _buffer - buffer, stdout);
cleanup:
free(cols);
}
}
void initialize_module(const char* module_name, void* module_handle, Context* cxt){
auto _init_module = reinterpret_cast<module_init_fn>(dlsym(module_handle, "init_session"));
if (_init_module) {
@ -581,25 +489,24 @@ start:
}
}
break;
case 'C': // activate callback based trigger
case 'C' : //register callback based trigger
{
const char* query_name = n_recvd[i] + 2;
const char* trigger_name = n_recvd[i] + 2;
const char* table_name = trigger_name;
while(*table_name++);
const char* query_name = table_name;
while(*query_name++);
const char* action_name = query_name;
while(*action_name++);
if(auto q = get_procedure(cxt, query_name),
a = get_procedure(cxt, action_name);
q.name == nullptr || a.name == nullptr
)
printf("Warning: Invalid query or action name: %s %s",
query_name, action_name);
else{
auto query = AQ_DupObject(&q);
auto action = AQ_DupObject(&a);
cxt->ct_host->execute_trigger(query, action);
}
cxt->ct_host->add_trigger(trigger_name, table_name, query_name, action_name);
}
break;
case 'A': // activate callback based trigger
activate_callback_based_trigger(cxt, n_recvd[i]);
break;
case 'N':
cxt->ct_host->execute_trigger(n_recvd[i] + 2);
break;
case 'R': // remove trigger
{
cxt->it_host->remove_trigger(n_recvd[i] + 2);

@ -222,6 +222,7 @@ bool IntervalBasedTrigger::tick(uint32_t delta_t) {
CallbackBasedTriggerHost::CallbackBasedTriggerHost(ThreadPool *tp, Context *cxt) {
this->tp = tp;
this->cxt = cxt;
this->triggers = new aq_map<std::string, CallbackBasedTrigger>;
}
void CallbackBasedTriggerHost::execute_trigger(StoredProcedure* query, StoredProcedure* action) {
@ -234,4 +235,35 @@ void CallbackBasedTriggerHost::execute_trigger(StoredProcedure* query, StoredPro
this->tp->enqueue_task(payload);
}
void execute_trigger(const char* trigger_name) {
auto vt_triggers = static_cast<aq_map<std::string, CallbackBasedTrigger> *>(this->triggers);
auto ptr = vt_triggers->find(trigger_name);
if (ptr != vt_triggers->end()) {
auto& tr = ptr->second;
if (!tr.materialized) {
tr.query = new CallbackBasedTrigger(get_procedure(cxt, tr.query_name));
tr.action = new CallbackBasedTrigger(get_procedure(cxt, tr.action_name));
tr.materialized = true;
}
this->execute_trigger(AQ_DupObject(tr.query), AQ_DupObject(tr.action));
}
}
void CallbackBasedTriggerHost::add_trigger(
const char* trigger_name,
const char* table_name,
const char* query_name,
const char* action_name
) {
auto vt_triggers = static_cast<aq_map<std::string, CallbackBasedTrigger> *>(this->triggers);
auto tr = CallbackBasedTrigger {
.trigger_name = trigger_name,
.table_name = table_name,
.query_name = query_name,
.action_name = action_name,
.materialized = false
};
vt_triggers->emplace(trigger_name, tr);
}
void CallbackBasedTriggerHost::tick() {}

@ -80,10 +80,28 @@ private:
void tick() override;
};
struct CallbackBasedTrigger : Trigger {
const char* trigger_name;
const char* table_name;
union {
StoredProcedure* query;
const char* query_name;
};
union {
StoredProcedure* action;
const char* action_name;
};
bool materialized;
};
class CallbackBasedTriggerHost : public TriggerHost {
public:
explicit CallbackBasedTriggerHost(ThreadPool *tp, Context *cxt);
void execute_trigger(StoredProcedure* query, StoredProcedure* action);
void execute_trigger(const char* trigger_name);
void add_trigger(const char* trigger_name, const char* table_name,
const char* query_name, const char* action_name);
private:
void tick() override;
};

@ -173,14 +173,24 @@ public:
if constexpr (_grow)
sz = this->size;
if (sz >= capacity) { // geometric growth
bool reallocate = true;
if (capacity == 0 && sz > 0)
[[unlikely]]
reallocate = false;
uint32_t new_capacity;
if constexpr (_grow)
new_capacity = size + 1 + (size >> 1);
else
new_capacity = sz;
_Ty* n_container = (_Ty*)realloc(container, new_capacity * sizeof(_Ty));
// memcpy(n_container, container, sizeof(_Ty) * size);
_Ty* n_container;
if (reallocate) {
n_container = (_Ty*)realloc(container, new_capacity * sizeof(_Ty));
}
else {
n_container = (_Ty*)malloc(new_capacity * sizeof(_Ty));
memcpy(n_container, container, sizeof(_Ty) * size);
}
memset(n_container + size, 0, sizeof(_Ty) * (new_capacity - size));
// if (capacity)
// free(container);

@ -0,0 +1,16 @@
LOAD MODULE FROM "./libirf.so" FUNCTIONS ( newtree(height:int, f:int64, sparse:vecint, forget:double, maxf:int64, noclasses:int64, e:int) -> bool, fit(X:vecvecdouble, y:vecint64) -> bool, predict(X:vecvecdouble) -> vecint );
create table source(x vecdouble, y int64);
-- Create trigger 1 ~~ to predict whenever sz(source > ?)
-- Create trigger 2 ~~ to auto feed ~
load complex data infile "data/electricity/electricity1.csv" into table source fields terminated by ',' element terminated by ';';
create table elec_sparse(v int);
insert into elec_sparse values (0), (1), (1), (1), (1), (1), (1);
select newtree(30, 7, elec_sparse.v, 0, 4, 2, 1) from elec_sparse
select fit(x, y) from source
-- select pack(x1, x2, x3, x4) from source
select predict(x) from source
Loading…
Cancel
Save