Compare commits

...

3 Commits

@ -33,6 +33,7 @@ def common_parser():
return parser(ansi_string | aquery_doublequote_string, combined_ident)
def parser(literal_string, ident):
with Whitespace() as engine:
engine.add_ignore(Literal("--") + restOfLine)
@ -569,7 +570,25 @@ def parser(literal_string, ident):
+ index_type
+ index_column_names
+ index_options
)("create index")
)("create_index")
create_trigger = (
keyword("create trigger")
+ var_name("name")
+ ((
ON
+ var_name("table")
+ keyword("action")
+ var_name("action")
+ WHEN
+ var_name("query") )
| (
keyword("action")
+ var_name("action")
+ INTERVAL
+ int_num("interval")
))
)("create_trigger")
cache_options = Optional((
keyword("options").suppress()
@ -693,7 +712,7 @@ def parser(literal_string, ident):
sql_stmts = delimited_list( (
query
| (insert | update | delete | load)
| (create_table | create_view | create_cache | create_index)
| (create_table | create_view | create_cache | create_index | create_trigger)
| (drop_table | drop_view | drop_index)
)("stmts"), ";")
@ -707,6 +726,5 @@ def parser(literal_string, ident):
|other_stmt
| keyword(";").suppress() # empty stmt
)
return stmts.finalize()

File diff suppressed because one or more lines are too long

@ -43,7 +43,7 @@ class orderby(ast_node):
def merge(self, node):
self.produce(node)
def finialize(self, references):
def finalize(self, references):
self.order = [ o for o in self.order if o.name in references ]
def result(self, sep:str = ','):

@ -8,7 +8,7 @@ nums = '0123456789'
base62alp = nums + lower_alp + upper_alp
reserved_monet = ['month']
session_context = None
class CaseInsensitiveDict(MutableMapping):
def __init__(self, data=None, **kwargs):
@ -158,3 +158,35 @@ def get_innermost(sl):
return get_innermost(sl[0])
else:
return sl
def send_to_server(payload : str):
from prompt import PromptState
cxt : PromptState = session_context
if cxt is None:
raise RuntimeError("Error! no session specified.")
else:
from ctypes import c_char_p
cxt.payload = (c_char_p*1)(c_char_p(bytes(payload, 'utf-8')))
cxt.cfg.has_dll = 0
cxt.send(1, cxt.payload)
cxt.set_ready()
def get_storedproc(name : str):
from prompt import PromptState, StoredProcedure
cxt : PromptState = session_context
if cxt is None:
raise RuntimeError("Error! no session specified.")
else:
ret : StoredProcedure = cxt.get_storedproc(bytes(name, 'utf-8'))
if (
ret.name.value and
ret.name.value.decode('utf-8') != name
):
print(f'Procedure {name} mismatch in server {ret.name.value}')
return None
else:
return ret
def execute_procedure(proc):
pass

@ -119,6 +119,15 @@ class Backend_Type(enum.Enum):
BACKEND_MonetDB = 1
BACKEND_MariaDB = 2
class StoredProcedure(ctypes.Structure):
_fields_ = [
('cnt', ctypes.c_uint32),
('postproc_modules', ctypes.c_uint32),
('queries', ctypes.POINTER(ctypes.c_char_p)),
('name', ctypes.c_char_p),
('__rt_loaded_modules', ctypes.POINTER(ctypes.c_void_p)),
]
@dataclass
class QueryStats:
last_time : int = time.time()
@ -229,6 +238,7 @@ class PromptState():
server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so'
wait_engine = lambda: None
wake_engine = lambda: None
get_storedproc = lambda : StoredProcedure()
set_ready = lambda: None
get_ready = lambda: None
server_status = lambda: False
@ -322,6 +332,8 @@ def init_threaded(state : PromptState):
state.send = server_so['receive_args']
state.wait_engine = server_so['wait_engine']
state.wake_engine = server_so['wake_engine']
state.get_storedproc = server_so['get_procedure']
state.get_storedproc.restype = StoredProcedure
aquery_config.have_hge = server_so['have_hge']()
if aquery_config.have_hge != 0:
from engine.types import get_int128_support
@ -330,9 +342,11 @@ def init_threaded(state : PromptState):
state.th.start()
def init_prompt() -> PromptState:
from engine.utils import session_context
aquery_config.init_config()
state = PromptState()
session_context = state
# if aquery_config.rebuild_backend:
# try:
# os.remove(state.server_bin)
@ -454,7 +468,7 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr
continue
elif q.startswith('xexec') or q.startswith('exec'): # generate build and run (MonetDB Engine)
state.cfg.backend_type = Backend_Type.BACKEND_MonetDB.value
cxt = xengine.exec(state.stmts, cxt, keep)
cxt = xengine.exec(state.stmts, cxt, keep, parser.parse)
this_udf = cxt.finalize_udf()
if this_udf:
@ -613,11 +627,7 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr
elif q.startswith('procedure'):
qs = re.split(r'[ \t\r\n]', q)
procedure_help = '''Usage: procedure <procedure_name> [record|stop|run|remove|save|load]'''
def send_to_server(payload : str):
state.payload = (ctypes.c_char_p*1)(ctypes.c_char_p(bytes(payload, 'utf-8')))
state.cfg.has_dll = 0
state.send(1, state.payload)
state.set_ready()
from engine.utils import send_to_server
if len(qs) > 2:
if qs[2].lower() =='record':
if state.current_procedure is not None and state.current_procedure != qs[1]:

@ -18,10 +18,11 @@ def generate(ast, cxt):
if k in ast_node.types.keys():
ast_node.types[k](None, ast, cxt)
def exec(stmts, cxt = None, keep = False):
def exec(stmts, cxt = None, keep = False, parser = None):
if 'stmts' not in stmts:
return
cxt = initialize(cxt, keep)
cxt.parser = parser
stmts_stmts = stmts['stmts']
if type(stmts_stmts) is list:
for s in stmts_stmts:

@ -281,11 +281,11 @@ class projection(ast_node):
# self.into_stub = f'{{INTOSTUB{base62uuid(20)}}}'
# self.add(self.into_stub, '')
def finialize(astnode:ast_node):
def finalize(astnode:ast_node):
if(astnode is not None):
self.add(astnode.sql)
finialize(self.datasource)
finialize(self.where)
finalize(self.datasource)
finalize(self.where)
if self.group_node and not self.group_node.use_sp_gb:
self.add(self.group_node.sql)
@ -1082,6 +1082,62 @@ class create_table(ast_node):
if self.context.use_columnstore:
self.sql += ' engine=ColumnStore'
class create_trigger(ast_node):
name = 'create_trigger'
first_order = name
class Type (Enum):
Interval = auto()
Callback = auto()
def produce(self, node):
from engine.utils import send_to_server, get_storedproc
node = node['create_trigger']
self.trigger_name = node['name']
self.action_name = node['action']
self.action = get_storedproc(self.action_name)
if self.trigger_name in self.context.triggers:
raise ValueError(f'trigger {self.trigger_name} exists')
elif self.action:
raise ValueError(f'Stored Procedure {self.action_name} do not exist')
if 'interval' in node: # executed periodically from server
self.type = self.Type.Interval
self.interval = node['interval']
send_to_server(f'TI{self.trigger_name}\0{self.action_name}\0{self.interval}')
else: # executed from sql backend
self.type = self.Type.Callback
self.query_name = node['query']
self.table_name = node['table']
self.procedure = get_storedproc(self.query_name)
if self.procedure and self.table_name in self.context.tables_byname:
self.table = self.context.tables_byname[self.table_name]
self.table.triggers.add(self)
else:
return
self.context.triggers[self.trigger_name] = self
# manually execute trigger
def register(self):
if self.type != self.Type.Callback:
self.context.triggers.pop(self.trigger_name)
raise ValueError(f'Trigger {self.trigger_name} is not a callback based trigger')
self.context.triggers_active.add(self)
def execute(self):
from engine.utils import send_to_server
send_to_server(f'TC{self.query_name}\0{self.action_name}')
def remove(self):
from engine.utils import send_to_server
send_to_server(f'TR{self.trigger_name}')
class drop_trigger(ast_node):
name = 'drop_trigger'
first_order = name
def produce(self, node):
...
class drop(ast_node):
name = 'drop'
first_order = name
@ -1111,9 +1167,11 @@ class insert(ast_node):
complex_query_kw = ['from', 'where', 'groupby', 'having', 'orderby', 'limit']
if any([kw in values for kw in complex_query_kw]):
values['into'] = node['insert']
proj_cls = (select_distinct
proj_cls = (
select_distinct
if 'select_distinct' in values
else projection)
else projection
)
proj_cls(None, values, self.context)
self.produce = lambda*_:None
self.spawn = lambda*_:None
@ -1147,6 +1205,11 @@ class insert(ast_node):
keys = f'({", ".join(keys)})' if keys else ''
tbl = node['insert']
if tbl not in self.context.tables_byname:
print('Warning: {tbl} not registered in aquery compiler.')
tbl_obj = self.context.tables_byname[tbl]
for t in tbl_obj.triggers:
t.register()
self.sql = f'INSERT INTO {tbl}{keys} VALUES'
# if len(values) != table.n_cols:
# raise ValueError("Column Mismatch")
@ -1624,6 +1687,11 @@ class passthru_sql(ast_node):
seprator = re.compile(r'''((?:[^;"']|"[^"]*"|'[^']*')+)''')
def __init__(self, _, node, context:Context):
sqls = passthru_sql.seprator.split(node['sql'])
try:
if callable(context.parser):
parsed = context.parser(node['sql'])
except BaseException:
parsed = None
for sql in sqls:
sq = sql.strip(' \t\n\r;')
if sq:

@ -64,12 +64,14 @@ class ColRef:
class TableInfo:
def __init__(self, table_name, cols, cxt:'Context'):
from reconstruct.ast import create_trigger
# statics
self.table_name : str = table_name
self.contextname_cpp : str = ''
self.alias : Set[str] = set([table_name])
self.columns_byname : CaseInsensitiveDict[str, ColRef] = CaseInsensitiveDict() # column_name, type
self.columns : List[ColRef] = []
self.triggers : Set[create_trigger] = set()
self.cxt = cxt
# keep track of temp vars
self.rec = None
@ -156,9 +158,11 @@ class Context:
self.module_init_loc = 0
self.special_gb = False
self.has_dll = False
self.triggers_active.clear()
def __init__(self):
self.tables_byname = dict()
from .ast import create_trigger
self.tables_byname : Dict[str, TableInfo] = dict()
self.col_byname = dict()
self.tables : Set[TableInfo] = set()
self.cols = []
@ -174,6 +178,9 @@ class Context:
self.have_hge = False
self.Error = lambda *args: print(*args)
self.Info = lambda *_: None
self.triggers : Dict[str, create_trigger] = dict()
self.triggers_active = set()
self.stored_proceudres = dict()
# self.new() called everytime new query batch is started
def get_scan_var(self):
@ -257,6 +264,18 @@ class Context:
self.queries.append(
'O' + limit + sep + end)
def remove_trigger(self, name : str):
from reconstruct.ast import create_trigger
val = self.triggers.pop(name, None)
if val.type == create_trigger.Type.Callback:
val.table.triggers.remove(val)
val.remove()
def post_exec_triggers(self):
for t in self.triggers_active:
t.execute()
self.triggers_active.clear()
def abandon_postproc(self):
self.ccode = ''
self.finalize_query()

@ -5,7 +5,7 @@ else
OPT_FLAGS = -g3 -D_DEBUG -fsanitize=leak -fsanitize=address
endif
example:
$(CXX) -shared -fPIC example.cpp aquery_mem.cpp -fno-semantic-interposition -Ofast -march=native -flto --std=c++1z -o ../test.so
$(CXX) -shared -fPIC example.cpp aquery_mem.cpp -fno-semantic-interposition -Ofast -march=native -flto --std=c++1z -L.. -laquery -o ../test.so
irf:
$(CXX) -shared -fPIC RF.cpp irf.cpp incrementalDecisionTree.cpp aquery_mem.cpp Evaluation.cpp -fno-semantic-interposition $(OPT_FLAGS) --std=c++1z -o ../libirf.so
all: example

@ -84,7 +84,9 @@ __AQEXPORT__(void) init_session(Context* cxt);
#ifdef _WIN32
#include <cstring>
#else
void* memcpy(void*, const void*, unsigned long long);
namespace std {
void* memcpy(void*, const void*, unsigned long long);
}
#endif
struct vectortype_storage{
@ -95,7 +97,7 @@ struct vectortype_storage{
vectortype_storage() = default;
template <class Ty, template <typename> class VT>
vectortype_storage(const VT<Ty>& vt) {
memcpy(this, &vt, sizeof(vectortype_storage));
std::memcpy(this, &vt, sizeof(vectortype_storage));
}
};
struct ColRef_storage {
@ -108,7 +110,7 @@ struct ColRef_storage {
ColRef_storage() = default;
template <class Ty, template <typename> class VT>
ColRef_storage(const VT<Ty>& vt) {
memcpy(this, &vt, sizeof(ColRef_storage));
std::memcpy(this, &vt, sizeof(ColRef_storage));
}
};
#endif

@ -1,6 +1,6 @@
#include "aquery.h"
// __AQ_NO_SESSION__
#include "../server/table.h"
#include "aquery.h"
__AQEXPORT__(ColRef_storage) mulvec(int a, ColRef<float> b){
return a * b;

@ -3,8 +3,15 @@
// __AQ_NO_SESSION__
#include "../server/table.h"
#include "aquery.h"
#include "./server/gc.h"
__AQEXPORT__(void) __AQ_Init_GC__(Context* cxt) {
GC::gc_handle = static_cast<GC*>(cxt->gc);
GC::scratch_space = nullptr;
}
DecisionTree *dt = nullptr;
RandomForest *rf = nullptr;
@ -20,7 +27,7 @@ newtree(int height, long f, ColRef<int> X, double forget, long maxf, long noclas
if (maxf < 0)
maxf = f;
dt = new DecisionTree(f, X_cpy, forget, maxf, noclasses, e);
rf = new RandomForest(height, f, X_cpy, forget, noclasses, e)
rf = new RandomForest(height, f, X_cpy, forget, noclasses, e);
return true;
}
@ -42,6 +49,21 @@ newtree(int height, long f, ColRef<int> X, double forget, long maxf, long noclas
// return 1;
// }
__AQEXPORT__(bool)
fit_inc(vector_type<vector_type<double>> v, vector_type<long> res)
{
static uint32_t last_offset = 0;
double **data = (double **)malloc(v.size * sizeof(double *));
if(last_offset >= v.size)
last_offset = 0;
for (int i = last_offset; i < v.size; ++i)
data[i] = v.container[i].container;
rf->fit(data, res.container, v.size);
free(data);
return true;
}
__AQEXPORT__(bool)
fit(vector_type<vector_type<double>> v, vector_type<long> res)
{
@ -53,14 +75,15 @@ fit(vector_type<vector_type<double>> v, vector_type<long> res)
return true;
}
__AQEXPORT__(vectortype_cstorage)
predict(vector_type<vector_type<double>> v)
{
int *result = (int *)malloc(v.size * sizeof(int));
for (long i = 0; i < v.size; i++)
for (uint32_t i = 0; i < v.size; i++)
//result[i] = dt->Test(v.container[i].container, dt->DTree);
result[i] = rf->Test(v.container, rf->DTrees);
result[i] = int(rf->Test(v[i].container));
auto container = (vector_type<int> *)malloc(sizeof(vector_type<int>));
container->size = v.size;
container->capacity = 0;

@ -7,6 +7,12 @@
#include "types.h"
// #include "robin_hood.h"
#include "unordered_dense.h"
template<typename Key, typename Val>
using aq_map = ankerl::unordered_dense::map<Key, Val>;
template<typename Key>
using aq_set = ankerl::unordered_dense::set<Key>;
// only works for 64 bit systems
namespace hasher_consts{
constexpr size_t _FNV_offset_basis = 14695981039346656037ULL;
@ -19,7 +25,6 @@ inline size_t append_bytes(const unsigned char* _First) noexcept {
_Val ^= static_cast<size_t>(*_First);
_Val *= hasher_consts::_FNV_prime;
}
return _Val;
}

@ -58,27 +58,6 @@ void print<bool>(const bool&v, const char* delimiter){
}
template<class T>
T getInt(const char*& buf){
T ret = 0;
while(*buf >= '0' and *buf <= '9'){
ret = ret*10 + *buf - '0';
buf++;
}
return ret;
}
template<class T>
char* intToString(T val, char* buf){
while (val > 0){
*--buf = val%10 + '0';
val /= 10;
}
return buf;
}
void skip(const char*& buf){
while(*buf && (*buf >'9' || *buf < '0')) buf++;
}

@ -9,6 +9,7 @@
#include <unordered_map>
#include <chrono>
#include <filesystem>
#include <cstring>
class aq_timer {
private:
std::chrono::high_resolution_clock::time_point now;
@ -32,6 +33,27 @@ public:
#include "table.h"
template<class T = int>
T getInt(const char*& buf){
T ret = 0;
while(*buf >= '0' and *buf <= '9'){
ret = ret*10 + *buf - '0';
buf++;
}
return ret;
}
template<class T>
char* intToString(T val, char* buf){
while (val > 0){
*--buf = val%10 + '0';
val /= 10;
}
return buf;
}
enum Log_level {
LOG_INFO,
@ -72,7 +94,9 @@ struct StoredProcedure {
const char* name;
void **__rt_loaded_modules;
};
struct Trigger;
struct IntervalBasedTriggerHost;
struct CallbackBasedTriggerHost;
struct Context {
typedef int (*printf_type) (const char *format, ...);
@ -113,6 +137,9 @@ struct Context {
std::unordered_map<std::string, void*> tables;
std::unordered_map<std::string, uColRef *> cols;
std::unordered_map<std::string, StoredProcedure> stored_proc;
std::unordered_map<std::string, Trigger> triggers;
IntervalBasedTriggerHost *it_host;
CallbackBasedTriggerHost *ct_host;
};
@ -176,6 +203,13 @@ inline void AQ_ZeroMemory(_This_Struct& __val) {
memset(&__val, 0, sizeof(_This_Struct));
}
template <typename _This_Type>
inline _This_Type* AQ_DupObject(_This_Type* __val) {
auto ret = (_This_Type*)(malloc(sizeof(_This_Type)));
memcpy(ret, __val, sizeof(_This_Type));
return ret;
}
#ifdef __USE_STD_SEMAPHORE__
#include <semaphore>
class A_Semaphore {

@ -86,7 +86,8 @@ extern "C" int __DLLEXPORT__ binary_info() {
#endif
}
__AQEXPORT__(bool) have_hge(){
__AQEXPORT__(bool)
have_hge() {
#if defined(__MONETDB_CONN_H__)
return Server::havehge();
#else
@ -94,6 +95,19 @@ __AQEXPORT__(bool) have_hge(){
#endif
}
__AQEXPORT__(StoredProcedure)
get_procedure(Context* cxt, const char* name) {
auto res = cxt->stored_proc.find(name);
if (res == cxt->stored_proc.end())
return { .cnt = 0,
.postproc_modules = 0,
.queries = nullptr,
.name = nullptr,
.__rt_loaded_modules = nullptr
};
return res->second;
}
using prt_fn_t = char* (*)(void*, char*);
// This function contains heap allocations, free after use
@ -545,6 +559,37 @@ start:
}
}
break;
case 'T': // triggers
{
switch(n_recvd[i][1]){
case 'I': // register interval based trigger
{
const char* action_name = n_recvd[i] + 2;
while(*action_name++);
if(auto p = get_procedure(cxt, action_name); p.name == nullptr)
printf("Invalid action name: %s\n", action_name);
else {
auto action = AQ_DupObject(&p);
const char* interval = action_name;
while(*interval++);
const auto i_interval = getInt<uint32_t>(interval);
cxt->it_host->add_trigger(n_recvd[i] + 2, action, i_interval);
}
}
break;
case 'C': // activate callback based trigger
break;
case 'R': // remove trigger
{
cxt->it_host->remove_trigger(n_recvd[i] + 2);
}
break;
default:
printf("Corrupted message from prompt: %s\n", n_recvd[i]);
break;
}
}
break;
}
}
@ -604,7 +649,7 @@ int launcher(int argc, char** argv){
str = std::string("cd ") + pwd + std::string("&& python3 ./prompt.py ") + str;
return system(str.c_str());
}
#if !defined(TESTMAIN) && !( defined(_MSC_VER) && defined(_DEBUG) )
#if true || !defined(TESTMAIN) && !( defined(_MSC_VER) && defined(_DEBUG) )
extern "C" int __DLLEXPORT__ main(int argc, char** argv) {
#ifdef __AQ_BUILD_LAUNCHER__
return launcher(argc, argv);
@ -617,6 +662,8 @@ extern "C" int __DLLEXPORT__ main(int argc, char** argv) {
#ifdef THREADING
auto tp = new ThreadPool();
cxt->thread_pool = tp;
cxt->it_host = new IntervalBasedTriggerHost(tp);
cxt->ct_host = new CallbackBasedTriggerHost(tp);
#endif
const char* shmname;

@ -156,16 +156,17 @@ bool ThreadPool::busy(){
IntervalBasedTriggerHost::IntervalBasedTriggerHost(ThreadPool* tp){
this->tp = tp;
this->triggers = new vector_type<IntervalBasedTrigger>;
this->triggers = new aq_map<std::string, IntervalBasedTrigger>;
trigger_queue_lock = new mutex();
this->now = std::chrono::high_resolution_clock::now().time_since_epoch().count();
}
void IntervalBasedTriggerHost::add_trigger(StoredProcedure *p, uint32_t interval) {
void IntervalBasedTriggerHost::add_trigger(const char* name, StoredProcedure *p, uint32_t interval) {
auto tr = IntervalBasedTrigger{.interval = interval, .time_remaining = 0, .sp = p};
auto vt_triggers = static_cast<vector_type<IntervalBasedTrigger> *>(this->triggers);
auto vt_triggers = static_cast<aq_map<std::string, IntervalBasedTrigger> *>(this->triggers);
trigger_queue_lock->lock();
vt_triggers->emplace_back(tr);
vt_triggers->emplace(name, tr);
//(*vt_triggers)[name] = tr;
trigger_queue_lock->unlock();
}
@ -173,9 +174,9 @@ void IntervalBasedTriggerHost::tick() {
const auto current_time = std::chrono::high_resolution_clock::now().time_since_epoch().count();
const auto delta_t = static_cast<uint32_t>((current_time - now) / 1000000); // miliseconds precision
now = current_time;
auto vt_triggers = static_cast<vector_type<IntervalBasedTrigger> *>(this->triggers);
auto vt_triggers = static_cast<aq_map<std::string, IntervalBasedTrigger> *>(this->triggers);
trigger_queue_lock->lock();
for(auto& t : *vt_triggers) {
for(auto& [_, t] : vt_triggers->values()) {
if(t.tick(delta_t)) {
payload_t payload;
payload.f = execTriggerPayload;
@ -187,6 +188,11 @@ void IntervalBasedTriggerHost::tick() {
trigger_queue_lock->unlock();
}
void IntervalBasedTriggerHost::remove_trigger(const char* name) {
auto vt_triggers = static_cast<aq_map<std::string, IntervalBasedTrigger> *>(this->triggers);
vt_triggers->erase(name);
}
void IntervalBasedTrigger::reset() {
time_remaining = interval;
}
@ -201,3 +207,9 @@ bool IntervalBasedTrigger::tick(uint32_t delta_t) {
time_remaining = time_remaining - curr_dt;
return ret;
}
CallbackBasedTriggerHost::CallbackBasedTriggerHost(ThreadPool *tp) {
this->tp = tp;
}
void CallbackBasedTriggerHost::tick() {}

@ -2,6 +2,8 @@
#define _AQ_THREADING_H
#include <stdint.h>
#include <thread>
#include <mutex>
typedef int(*payload_fn_t)(void*);
struct payload_t{
@ -39,12 +41,11 @@ private:
};
#include <thread>
#include <mutex>
class A_Semphore;
class TriggerHost {
protected:
public:
void* triggers;
std::thread* handle;
ThreadPool *tp;
@ -52,14 +53,15 @@ protected:
std::mutex* trigger_queue_lock;
virtual void tick() = 0;
public:
TriggerHost() = default;
virtual ~TriggerHost() = default;
};
struct StoredProcedure;
struct IntervalBasedTrigger {
struct Trigger{};
struct IntervalBasedTrigger : Trigger {
uint32_t interval; // in milliseconds
uint32_t time_remaining;
StoredProcedure* sp;
@ -70,8 +72,8 @@ struct IntervalBasedTrigger {
class IntervalBasedTriggerHost : public TriggerHost {
public:
explicit IntervalBasedTriggerHost(ThreadPool *tp);
void add_trigger(StoredProcedure* stored_procedure, uint32_t interval);
void remove_trigger(uint32_t tid);
void add_trigger(const char* name, StoredProcedure* stored_procedure, uint32_t interval);
void remove_trigger(const char* name);
private:
unsigned long long now;
void tick() override;
@ -79,6 +81,7 @@ private:
class CallbackBasedTriggerHost : public TriggerHost {
public:
explicit CallbackBasedTriggerHost(ThreadPool *tp);
void add_trigger();
private:
void tick() override;

@ -18,7 +18,7 @@
#include "gc.h"
#pragma pack(push, 1)
struct vectortype_cstorage{
struct vectortype_cstorage {
void* container;
unsigned int size, capacity;
};
@ -123,7 +123,7 @@ public:
_copy(vt);
return *this;
}
vector_type<_Ty>& operator =(vector_type<_Ty>&& vt) {
vector_type<_Ty>& operator =(vector_type<_Ty>&& vt) noexcept {
_move(std::move(vt));
return *this;
}
@ -139,10 +139,10 @@ public:
return *this;
}
inline std::unordered_set<value_t> distinct_common(){
inline std::unordered_set<value_t> distinct_common() {
return std::unordered_set<value_t>(container, container + size);
}
vector_type<_Ty>& distinct_inplace(){
vector_type<_Ty>& distinct_inplace() {
uint32_t i = 0;
for(const auto& v : distinct_common()){
container[i++] = v;

Loading…
Cancel
Save