Compare commits

..

No commits in common. '64d4e3dd9a7e4dbefc9f427b660ac1567bfd8259' and '541c702d78c7bdc0c979cbcd96dfa411b8b7a054' have entirely different histories.

@ -33,7 +33,6 @@ def common_parser():
return parser(ansi_string | aquery_doublequote_string, combined_ident) return parser(ansi_string | aquery_doublequote_string, combined_ident)
def parser(literal_string, ident): def parser(literal_string, ident):
with Whitespace() as engine: with Whitespace() as engine:
engine.add_ignore(Literal("--") + restOfLine) engine.add_ignore(Literal("--") + restOfLine)
@ -570,25 +569,7 @@ def parser(literal_string, ident):
+ index_type + index_type
+ index_column_names + index_column_names
+ index_options + index_options
)("create_index") )("create index")
create_trigger = (
keyword("create trigger")
+ var_name("name")
+ ((
ON
+ var_name("table")
+ keyword("action")
+ var_name("action")
+ WHEN
+ var_name("query") )
| (
keyword("action")
+ var_name("action")
+ INTERVAL
+ int_num("interval")
))
)("create_trigger")
cache_options = Optional(( cache_options = Optional((
keyword("options").suppress() keyword("options").suppress()
@ -712,7 +693,7 @@ def parser(literal_string, ident):
sql_stmts = delimited_list( ( sql_stmts = delimited_list( (
query query
| (insert | update | delete | load) | (insert | update | delete | load)
| (create_table | create_view | create_cache | create_index | create_trigger) | (create_table | create_view | create_cache | create_index)
| (drop_table | drop_view | drop_index) | (drop_table | drop_view | drop_index)
)("stmts"), ";") )("stmts"), ";")
@ -726,5 +707,6 @@ def parser(literal_string, ident):
|other_stmt |other_stmt
| keyword(";").suppress() # empty stmt | keyword(";").suppress() # empty stmt
) )
return stmts.finalize() return stmts.finalize()

File diff suppressed because one or more lines are too long

@ -43,7 +43,7 @@ class orderby(ast_node):
def merge(self, node): def merge(self, node):
self.produce(node) self.produce(node)
def finalize(self, references): def finialize(self, references):
self.order = [ o for o in self.order if o.name in references ] self.order = [ o for o in self.order if o.name in references ]
def result(self, sep:str = ','): def result(self, sep:str = ','):

@ -8,7 +8,7 @@ nums = '0123456789'
base62alp = nums + lower_alp + upper_alp base62alp = nums + lower_alp + upper_alp
reserved_monet = ['month'] reserved_monet = ['month']
session_context = None
class CaseInsensitiveDict(MutableMapping): class CaseInsensitiveDict(MutableMapping):
def __init__(self, data=None, **kwargs): def __init__(self, data=None, **kwargs):
@ -158,35 +158,3 @@ def get_innermost(sl):
return get_innermost(sl[0]) return get_innermost(sl[0])
else: else:
return sl return sl
def send_to_server(payload : str):
from prompt import PromptState
cxt : PromptState = session_context
if cxt is None:
raise RuntimeError("Error! no session specified.")
else:
from ctypes import c_char_p
cxt.payload = (c_char_p*1)(c_char_p(bytes(payload, 'utf-8')))
cxt.cfg.has_dll = 0
cxt.send(1, cxt.payload)
cxt.set_ready()
def get_storedproc(name : str):
from prompt import PromptState, StoredProcedure
cxt : PromptState = session_context
if cxt is None:
raise RuntimeError("Error! no session specified.")
else:
ret : StoredProcedure = cxt.get_storedproc(bytes(name, 'utf-8'))
if (
ret.name.value and
ret.name.value.decode('utf-8') != name
):
print(f'Procedure {name} mismatch in server {ret.name.value}')
return None
else:
return ret
def execute_procedure(proc):
pass

@ -119,15 +119,6 @@ class Backend_Type(enum.Enum):
BACKEND_MonetDB = 1 BACKEND_MonetDB = 1
BACKEND_MariaDB = 2 BACKEND_MariaDB = 2
class StoredProcedure(ctypes.Structure):
_fields_ = [
('cnt', ctypes.c_uint32),
('postproc_modules', ctypes.c_uint32),
('queries', ctypes.POINTER(ctypes.c_char_p)),
('name', ctypes.c_char_p),
('__rt_loaded_modules', ctypes.POINTER(ctypes.c_void_p)),
]
@dataclass @dataclass
class QueryStats: class QueryStats:
last_time : int = time.time() last_time : int = time.time()
@ -238,7 +229,6 @@ class PromptState():
server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so' server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so'
wait_engine = lambda: None wait_engine = lambda: None
wake_engine = lambda: None wake_engine = lambda: None
get_storedproc = lambda : StoredProcedure()
set_ready = lambda: None set_ready = lambda: None
get_ready = lambda: None get_ready = lambda: None
server_status = lambda: False server_status = lambda: False
@ -332,8 +322,6 @@ def init_threaded(state : PromptState):
state.send = server_so['receive_args'] state.send = server_so['receive_args']
state.wait_engine = server_so['wait_engine'] state.wait_engine = server_so['wait_engine']
state.wake_engine = server_so['wake_engine'] state.wake_engine = server_so['wake_engine']
state.get_storedproc = server_so['get_procedure']
state.get_storedproc.restype = StoredProcedure
aquery_config.have_hge = server_so['have_hge']() aquery_config.have_hge = server_so['have_hge']()
if aquery_config.have_hge != 0: if aquery_config.have_hge != 0:
from engine.types import get_int128_support from engine.types import get_int128_support
@ -342,11 +330,9 @@ def init_threaded(state : PromptState):
state.th.start() state.th.start()
def init_prompt() -> PromptState: def init_prompt() -> PromptState:
from engine.utils import session_context
aquery_config.init_config() aquery_config.init_config()
state = PromptState() state = PromptState()
session_context = state
# if aquery_config.rebuild_backend: # if aquery_config.rebuild_backend:
# try: # try:
# os.remove(state.server_bin) # os.remove(state.server_bin)
@ -468,7 +454,7 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr
continue continue
elif q.startswith('xexec') or q.startswith('exec'): # generate build and run (MonetDB Engine) elif q.startswith('xexec') or q.startswith('exec'): # generate build and run (MonetDB Engine)
state.cfg.backend_type = Backend_Type.BACKEND_MonetDB.value state.cfg.backend_type = Backend_Type.BACKEND_MonetDB.value
cxt = xengine.exec(state.stmts, cxt, keep, parser.parse) cxt = xengine.exec(state.stmts, cxt, keep)
this_udf = cxt.finalize_udf() this_udf = cxt.finalize_udf()
if this_udf: if this_udf:
@ -627,7 +613,11 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr
elif q.startswith('procedure'): elif q.startswith('procedure'):
qs = re.split(r'[ \t\r\n]', q) qs = re.split(r'[ \t\r\n]', q)
procedure_help = '''Usage: procedure <procedure_name> [record|stop|run|remove|save|load]''' procedure_help = '''Usage: procedure <procedure_name> [record|stop|run|remove|save|load]'''
from engine.utils import send_to_server def send_to_server(payload : str):
state.payload = (ctypes.c_char_p*1)(ctypes.c_char_p(bytes(payload, 'utf-8')))
state.cfg.has_dll = 0
state.send(1, state.payload)
state.set_ready()
if len(qs) > 2: if len(qs) > 2:
if qs[2].lower() =='record': if qs[2].lower() =='record':
if state.current_procedure is not None and state.current_procedure != qs[1]: if state.current_procedure is not None and state.current_procedure != qs[1]:

@ -18,11 +18,10 @@ def generate(ast, cxt):
if k in ast_node.types.keys(): if k in ast_node.types.keys():
ast_node.types[k](None, ast, cxt) ast_node.types[k](None, ast, cxt)
def exec(stmts, cxt = None, keep = False, parser = None): def exec(stmts, cxt = None, keep = False):
if 'stmts' not in stmts: if 'stmts' not in stmts:
return return
cxt = initialize(cxt, keep) cxt = initialize(cxt, keep)
cxt.parser = parser
stmts_stmts = stmts['stmts'] stmts_stmts = stmts['stmts']
if type(stmts_stmts) is list: if type(stmts_stmts) is list:
for s in stmts_stmts: for s in stmts_stmts:

@ -281,11 +281,11 @@ class projection(ast_node):
# self.into_stub = f'{{INTOSTUB{base62uuid(20)}}}' # self.into_stub = f'{{INTOSTUB{base62uuid(20)}}}'
# self.add(self.into_stub, '') # self.add(self.into_stub, '')
def finalize(astnode:ast_node): def finialize(astnode:ast_node):
if(astnode is not None): if(astnode is not None):
self.add(astnode.sql) self.add(astnode.sql)
finalize(self.datasource) finialize(self.datasource)
finalize(self.where) finialize(self.where)
if self.group_node and not self.group_node.use_sp_gb: if self.group_node and not self.group_node.use_sp_gb:
self.add(self.group_node.sql) self.add(self.group_node.sql)
@ -1082,62 +1082,6 @@ class create_table(ast_node):
if self.context.use_columnstore: if self.context.use_columnstore:
self.sql += ' engine=ColumnStore' self.sql += ' engine=ColumnStore'
class create_trigger(ast_node):
name = 'create_trigger'
first_order = name
class Type (Enum):
Interval = auto()
Callback = auto()
def produce(self, node):
from engine.utils import send_to_server, get_storedproc
node = node['create_trigger']
self.trigger_name = node['name']
self.action_name = node['action']
self.action = get_storedproc(self.action_name)
if self.trigger_name in self.context.triggers:
raise ValueError(f'trigger {self.trigger_name} exists')
elif self.action:
raise ValueError(f'Stored Procedure {self.action_name} do not exist')
if 'interval' in node: # executed periodically from server
self.type = self.Type.Interval
self.interval = node['interval']
send_to_server(f'TI{self.trigger_name}\0{self.action_name}\0{self.interval}')
else: # executed from sql backend
self.type = self.Type.Callback
self.query_name = node['query']
self.table_name = node['table']
self.procedure = get_storedproc(self.query_name)
if self.procedure and self.table_name in self.context.tables_byname:
self.table = self.context.tables_byname[self.table_name]
self.table.triggers.add(self)
else:
return
self.context.triggers[self.trigger_name] = self
# manually execute trigger
def register(self):
if self.type != self.Type.Callback:
self.context.triggers.pop(self.trigger_name)
raise ValueError(f'Trigger {self.trigger_name} is not a callback based trigger')
self.context.triggers_active.add(self)
def execute(self):
from engine.utils import send_to_server
send_to_server(f'TC{self.query_name}\0{self.action_name}')
def remove(self):
from engine.utils import send_to_server
send_to_server(f'TR{self.trigger_name}')
class drop_trigger(ast_node):
name = 'drop_trigger'
first_order = name
def produce(self, node):
...
class drop(ast_node): class drop(ast_node):
name = 'drop' name = 'drop'
first_order = name first_order = name
@ -1167,11 +1111,9 @@ class insert(ast_node):
complex_query_kw = ['from', 'where', 'groupby', 'having', 'orderby', 'limit'] complex_query_kw = ['from', 'where', 'groupby', 'having', 'orderby', 'limit']
if any([kw in values for kw in complex_query_kw]): if any([kw in values for kw in complex_query_kw]):
values['into'] = node['insert'] values['into'] = node['insert']
proj_cls = ( proj_cls = (select_distinct
select_distinct if 'select_distinct' in values
if 'select_distinct' in values else projection)
else projection
)
proj_cls(None, values, self.context) proj_cls(None, values, self.context)
self.produce = lambda*_:None self.produce = lambda*_:None
self.spawn = lambda*_:None self.spawn = lambda*_:None
@ -1205,11 +1147,6 @@ class insert(ast_node):
keys = f'({", ".join(keys)})' if keys else '' keys = f'({", ".join(keys)})' if keys else ''
tbl = node['insert'] tbl = node['insert']
if tbl not in self.context.tables_byname:
print('Warning: {tbl} not registered in aquery compiler.')
tbl_obj = self.context.tables_byname[tbl]
for t in tbl_obj.triggers:
t.register()
self.sql = f'INSERT INTO {tbl}{keys} VALUES' self.sql = f'INSERT INTO {tbl}{keys} VALUES'
# if len(values) != table.n_cols: # if len(values) != table.n_cols:
# raise ValueError("Column Mismatch") # raise ValueError("Column Mismatch")
@ -1687,11 +1624,6 @@ class passthru_sql(ast_node):
seprator = re.compile(r'''((?:[^;"']|"[^"]*"|'[^']*')+)''') seprator = re.compile(r'''((?:[^;"']|"[^"]*"|'[^']*')+)''')
def __init__(self, _, node, context:Context): def __init__(self, _, node, context:Context):
sqls = passthru_sql.seprator.split(node['sql']) sqls = passthru_sql.seprator.split(node['sql'])
try:
if callable(context.parser):
parsed = context.parser(node['sql'])
except BaseException:
parsed = None
for sql in sqls: for sql in sqls:
sq = sql.strip(' \t\n\r;') sq = sql.strip(' \t\n\r;')
if sq: if sq:

@ -64,14 +64,12 @@ class ColRef:
class TableInfo: class TableInfo:
def __init__(self, table_name, cols, cxt:'Context'): def __init__(self, table_name, cols, cxt:'Context'):
from reconstruct.ast import create_trigger
# statics # statics
self.table_name : str = table_name self.table_name : str = table_name
self.contextname_cpp : str = '' self.contextname_cpp : str = ''
self.alias : Set[str] = set([table_name]) self.alias : Set[str] = set([table_name])
self.columns_byname : CaseInsensitiveDict[str, ColRef] = CaseInsensitiveDict() # column_name, type self.columns_byname : CaseInsensitiveDict[str, ColRef] = CaseInsensitiveDict() # column_name, type
self.columns : List[ColRef] = [] self.columns : List[ColRef] = []
self.triggers : Set[create_trigger] = set()
self.cxt = cxt self.cxt = cxt
# keep track of temp vars # keep track of temp vars
self.rec = None self.rec = None
@ -158,11 +156,9 @@ class Context:
self.module_init_loc = 0 self.module_init_loc = 0
self.special_gb = False self.special_gb = False
self.has_dll = False self.has_dll = False
self.triggers_active.clear()
def __init__(self): def __init__(self):
from .ast import create_trigger self.tables_byname = dict()
self.tables_byname : Dict[str, TableInfo] = dict()
self.col_byname = dict() self.col_byname = dict()
self.tables : Set[TableInfo] = set() self.tables : Set[TableInfo] = set()
self.cols = [] self.cols = []
@ -178,9 +174,6 @@ class Context:
self.have_hge = False self.have_hge = False
self.Error = lambda *args: print(*args) self.Error = lambda *args: print(*args)
self.Info = lambda *_: None self.Info = lambda *_: None
self.triggers : Dict[str, create_trigger] = dict()
self.triggers_active = set()
self.stored_proceudres = dict()
# self.new() called everytime new query batch is started # self.new() called everytime new query batch is started
def get_scan_var(self): def get_scan_var(self):
@ -264,18 +257,6 @@ class Context:
self.queries.append( self.queries.append(
'O' + limit + sep + end) 'O' + limit + sep + end)
def remove_trigger(self, name : str):
from reconstruct.ast import create_trigger
val = self.triggers.pop(name, None)
if val.type == create_trigger.Type.Callback:
val.table.triggers.remove(val)
val.remove()
def post_exec_triggers(self):
for t in self.triggers_active:
t.execute()
self.triggers_active.clear()
def abandon_postproc(self): def abandon_postproc(self):
self.ccode = '' self.ccode = ''
self.finalize_query() self.finalize_query()

@ -5,7 +5,7 @@ else
OPT_FLAGS = -g3 -D_DEBUG -fsanitize=leak -fsanitize=address OPT_FLAGS = -g3 -D_DEBUG -fsanitize=leak -fsanitize=address
endif endif
example: example:
$(CXX) -shared -fPIC example.cpp aquery_mem.cpp -fno-semantic-interposition -Ofast -march=native -flto --std=c++1z -L.. -laquery -o ../test.so $(CXX) -shared -fPIC example.cpp aquery_mem.cpp -fno-semantic-interposition -Ofast -march=native -flto --std=c++1z -o ../test.so
irf: irf:
$(CXX) -shared -fPIC RF.cpp irf.cpp incrementalDecisionTree.cpp aquery_mem.cpp Evaluation.cpp -fno-semantic-interposition $(OPT_FLAGS) --std=c++1z -o ../libirf.so $(CXX) -shared -fPIC RF.cpp irf.cpp incrementalDecisionTree.cpp aquery_mem.cpp Evaluation.cpp -fno-semantic-interposition $(OPT_FLAGS) --std=c++1z -o ../libirf.so
all: example all: example

@ -84,9 +84,7 @@ __AQEXPORT__(void) init_session(Context* cxt);
#ifdef _WIN32 #ifdef _WIN32
#include <cstring> #include <cstring>
#else #else
namespace std { void* memcpy(void*, const void*, unsigned long long);
void* memcpy(void*, const void*, unsigned long long);
}
#endif #endif
struct vectortype_storage{ struct vectortype_storage{
@ -97,7 +95,7 @@ struct vectortype_storage{
vectortype_storage() = default; vectortype_storage() = default;
template <class Ty, template <typename> class VT> template <class Ty, template <typename> class VT>
vectortype_storage(const VT<Ty>& vt) { vectortype_storage(const VT<Ty>& vt) {
std::memcpy(this, &vt, sizeof(vectortype_storage)); memcpy(this, &vt, sizeof(vectortype_storage));
} }
}; };
struct ColRef_storage { struct ColRef_storage {
@ -110,7 +108,7 @@ struct ColRef_storage {
ColRef_storage() = default; ColRef_storage() = default;
template <class Ty, template <typename> class VT> template <class Ty, template <typename> class VT>
ColRef_storage(const VT<Ty>& vt) { ColRef_storage(const VT<Ty>& vt) {
std::memcpy(this, &vt, sizeof(ColRef_storage)); memcpy(this, &vt, sizeof(ColRef_storage));
} }
}; };
#endif #endif

@ -1,6 +1,6 @@
#include "aquery.h"
// __AQ_NO_SESSION__ // __AQ_NO_SESSION__
#include "../server/table.h" #include "../server/table.h"
#include "aquery.h"
__AQEXPORT__(ColRef_storage) mulvec(int a, ColRef<float> b){ __AQEXPORT__(ColRef_storage) mulvec(int a, ColRef<float> b){
return a * b; return a * b;

@ -3,15 +3,8 @@
// __AQ_NO_SESSION__ // __AQ_NO_SESSION__
#include "../server/table.h" #include "../server/table.h"
#include "aquery.h" #include "aquery.h"
#include "./server/gc.h"
__AQEXPORT__(void) __AQ_Init_GC__(Context* cxt) {
GC::gc_handle = static_cast<GC*>(cxt->gc);
GC::scratch_space = nullptr;
}
DecisionTree *dt = nullptr; DecisionTree *dt = nullptr;
RandomForest *rf = nullptr; RandomForest *rf = nullptr;
@ -27,7 +20,7 @@ newtree(int height, long f, ColRef<int> X, double forget, long maxf, long noclas
if (maxf < 0) if (maxf < 0)
maxf = f; maxf = f;
dt = new DecisionTree(f, X_cpy, forget, maxf, noclasses, e); dt = new DecisionTree(f, X_cpy, forget, maxf, noclasses, e);
rf = new RandomForest(height, f, X_cpy, forget, noclasses, e); rf = new RandomForest(height, f, X_cpy, forget, noclasses, e)
return true; return true;
} }
@ -49,21 +42,6 @@ newtree(int height, long f, ColRef<int> X, double forget, long maxf, long noclas
// return 1; // return 1;
// } // }
__AQEXPORT__(bool)
fit_inc(vector_type<vector_type<double>> v, vector_type<long> res)
{
static uint32_t last_offset = 0;
double **data = (double **)malloc(v.size * sizeof(double *));
if(last_offset >= v.size)
last_offset = 0;
for (int i = last_offset; i < v.size; ++i)
data[i] = v.container[i].container;
rf->fit(data, res.container, v.size);
free(data);
return true;
}
__AQEXPORT__(bool) __AQEXPORT__(bool)
fit(vector_type<vector_type<double>> v, vector_type<long> res) fit(vector_type<vector_type<double>> v, vector_type<long> res)
{ {
@ -75,15 +53,14 @@ fit(vector_type<vector_type<double>> v, vector_type<long> res)
return true; return true;
} }
__AQEXPORT__(vectortype_cstorage) __AQEXPORT__(vectortype_cstorage)
predict(vector_type<vector_type<double>> v) predict(vector_type<vector_type<double>> v)
{ {
int *result = (int *)malloc(v.size * sizeof(int)); int *result = (int *)malloc(v.size * sizeof(int));
for (uint32_t i = 0; i < v.size; i++) for (long i = 0; i < v.size; i++)
//result[i] = dt->Test(v.container[i].container, dt->DTree); //result[i] = dt->Test(v.container[i].container, dt->DTree);
result[i] = int(rf->Test(v[i].container)); result[i] = rf->Test(v.container, rf->DTrees);
auto container = (vector_type<int> *)malloc(sizeof(vector_type<int>)); auto container = (vector_type<int> *)malloc(sizeof(vector_type<int>));
container->size = v.size; container->size = v.size;
container->capacity = 0; container->capacity = 0;

@ -7,12 +7,6 @@
#include "types.h" #include "types.h"
// #include "robin_hood.h" // #include "robin_hood.h"
#include "unordered_dense.h" #include "unordered_dense.h"
template<typename Key, typename Val>
using aq_map = ankerl::unordered_dense::map<Key, Val>;
template<typename Key>
using aq_set = ankerl::unordered_dense::set<Key>;
// only works for 64 bit systems // only works for 64 bit systems
namespace hasher_consts{ namespace hasher_consts{
constexpr size_t _FNV_offset_basis = 14695981039346656037ULL; constexpr size_t _FNV_offset_basis = 14695981039346656037ULL;
@ -25,6 +19,7 @@ inline size_t append_bytes(const unsigned char* _First) noexcept {
_Val ^= static_cast<size_t>(*_First); _Val ^= static_cast<size_t>(*_First);
_Val *= hasher_consts::_FNV_prime; _Val *= hasher_consts::_FNV_prime;
} }
return _Val; return _Val;
} }

@ -58,6 +58,27 @@ void print<bool>(const bool&v, const char* delimiter){
} }
template<class T>
T getInt(const char*& buf){
T ret = 0;
while(*buf >= '0' and *buf <= '9'){
ret = ret*10 + *buf - '0';
buf++;
}
return ret;
}
template<class T>
char* intToString(T val, char* buf){
while (val > 0){
*--buf = val%10 + '0';
val /= 10;
}
return buf;
}
void skip(const char*& buf){ void skip(const char*& buf){
while(*buf && (*buf >'9' || *buf < '0')) buf++; while(*buf && (*buf >'9' || *buf < '0')) buf++;
} }

@ -9,7 +9,6 @@
#include <unordered_map> #include <unordered_map>
#include <chrono> #include <chrono>
#include <filesystem> #include <filesystem>
#include <cstring>
class aq_timer { class aq_timer {
private: private:
std::chrono::high_resolution_clock::time_point now; std::chrono::high_resolution_clock::time_point now;
@ -33,27 +32,6 @@ public:
#include "table.h" #include "table.h"
template<class T = int>
T getInt(const char*& buf){
T ret = 0;
while(*buf >= '0' and *buf <= '9'){
ret = ret*10 + *buf - '0';
buf++;
}
return ret;
}
template<class T>
char* intToString(T val, char* buf){
while (val > 0){
*--buf = val%10 + '0';
val /= 10;
}
return buf;
}
enum Log_level { enum Log_level {
LOG_INFO, LOG_INFO,
@ -94,9 +72,7 @@ struct StoredProcedure {
const char* name; const char* name;
void **__rt_loaded_modules; void **__rt_loaded_modules;
}; };
struct Trigger;
struct IntervalBasedTriggerHost;
struct CallbackBasedTriggerHost;
struct Context { struct Context {
typedef int (*printf_type) (const char *format, ...); typedef int (*printf_type) (const char *format, ...);
@ -137,9 +113,6 @@ struct Context {
std::unordered_map<std::string, void*> tables; std::unordered_map<std::string, void*> tables;
std::unordered_map<std::string, uColRef *> cols; std::unordered_map<std::string, uColRef *> cols;
std::unordered_map<std::string, StoredProcedure> stored_proc; std::unordered_map<std::string, StoredProcedure> stored_proc;
std::unordered_map<std::string, Trigger> triggers;
IntervalBasedTriggerHost *it_host;
CallbackBasedTriggerHost *ct_host;
}; };
@ -203,13 +176,6 @@ inline void AQ_ZeroMemory(_This_Struct& __val) {
memset(&__val, 0, sizeof(_This_Struct)); memset(&__val, 0, sizeof(_This_Struct));
} }
template <typename _This_Type>
inline _This_Type* AQ_DupObject(_This_Type* __val) {
auto ret = (_This_Type*)(malloc(sizeof(_This_Type)));
memcpy(ret, __val, sizeof(_This_Type));
return ret;
}
#ifdef __USE_STD_SEMAPHORE__ #ifdef __USE_STD_SEMAPHORE__
#include <semaphore> #include <semaphore>
class A_Semaphore { class A_Semaphore {

@ -86,8 +86,7 @@ extern "C" int __DLLEXPORT__ binary_info() {
#endif #endif
} }
__AQEXPORT__(bool) __AQEXPORT__(bool) have_hge(){
have_hge() {
#if defined(__MONETDB_CONN_H__) #if defined(__MONETDB_CONN_H__)
return Server::havehge(); return Server::havehge();
#else #else
@ -95,19 +94,6 @@ have_hge() {
#endif #endif
} }
__AQEXPORT__(StoredProcedure)
get_procedure(Context* cxt, const char* name) {
auto res = cxt->stored_proc.find(name);
if (res == cxt->stored_proc.end())
return { .cnt = 0,
.postproc_modules = 0,
.queries = nullptr,
.name = nullptr,
.__rt_loaded_modules = nullptr
};
return res->second;
}
using prt_fn_t = char* (*)(void*, char*); using prt_fn_t = char* (*)(void*, char*);
// This function contains heap allocations, free after use // This function contains heap allocations, free after use
@ -559,37 +545,6 @@ start:
} }
} }
break; break;
case 'T': // triggers
{
switch(n_recvd[i][1]){
case 'I': // register interval based trigger
{
const char* action_name = n_recvd[i] + 2;
while(*action_name++);
if(auto p = get_procedure(cxt, action_name); p.name == nullptr)
printf("Invalid action name: %s\n", action_name);
else {
auto action = AQ_DupObject(&p);
const char* interval = action_name;
while(*interval++);
const auto i_interval = getInt<uint32_t>(interval);
cxt->it_host->add_trigger(n_recvd[i] + 2, action, i_interval);
}
}
break;
case 'C': // activate callback based trigger
break;
case 'R': // remove trigger
{
cxt->it_host->remove_trigger(n_recvd[i] + 2);
}
break;
default:
printf("Corrupted message from prompt: %s\n", n_recvd[i]);
break;
}
}
break;
} }
} }
@ -649,7 +604,7 @@ int launcher(int argc, char** argv){
str = std::string("cd ") + pwd + std::string("&& python3 ./prompt.py ") + str; str = std::string("cd ") + pwd + std::string("&& python3 ./prompt.py ") + str;
return system(str.c_str()); return system(str.c_str());
} }
#if true || !defined(TESTMAIN) && !( defined(_MSC_VER) && defined(_DEBUG) ) #if !defined(TESTMAIN) && !( defined(_MSC_VER) && defined(_DEBUG) )
extern "C" int __DLLEXPORT__ main(int argc, char** argv) { extern "C" int __DLLEXPORT__ main(int argc, char** argv) {
#ifdef __AQ_BUILD_LAUNCHER__ #ifdef __AQ_BUILD_LAUNCHER__
return launcher(argc, argv); return launcher(argc, argv);
@ -662,8 +617,6 @@ extern "C" int __DLLEXPORT__ main(int argc, char** argv) {
#ifdef THREADING #ifdef THREADING
auto tp = new ThreadPool(); auto tp = new ThreadPool();
cxt->thread_pool = tp; cxt->thread_pool = tp;
cxt->it_host = new IntervalBasedTriggerHost(tp);
cxt->ct_host = new CallbackBasedTriggerHost(tp);
#endif #endif
const char* shmname; const char* shmname;

@ -156,17 +156,16 @@ bool ThreadPool::busy(){
IntervalBasedTriggerHost::IntervalBasedTriggerHost(ThreadPool* tp){ IntervalBasedTriggerHost::IntervalBasedTriggerHost(ThreadPool* tp){
this->tp = tp; this->tp = tp;
this->triggers = new aq_map<std::string, IntervalBasedTrigger>; this->triggers = new vector_type<IntervalBasedTrigger>;
trigger_queue_lock = new mutex(); trigger_queue_lock = new mutex();
this->now = std::chrono::high_resolution_clock::now().time_since_epoch().count(); this->now = std::chrono::high_resolution_clock::now().time_since_epoch().count();
} }
void IntervalBasedTriggerHost::add_trigger(const char* name, StoredProcedure *p, uint32_t interval) { void IntervalBasedTriggerHost::add_trigger(StoredProcedure *p, uint32_t interval) {
auto tr = IntervalBasedTrigger{.interval = interval, .time_remaining = 0, .sp = p}; auto tr = IntervalBasedTrigger{.interval = interval, .time_remaining = 0, .sp = p};
auto vt_triggers = static_cast<aq_map<std::string, IntervalBasedTrigger> *>(this->triggers); auto vt_triggers = static_cast<vector_type<IntervalBasedTrigger> *>(this->triggers);
trigger_queue_lock->lock(); trigger_queue_lock->lock();
vt_triggers->emplace(name, tr); vt_triggers->emplace_back(tr);
//(*vt_triggers)[name] = tr;
trigger_queue_lock->unlock(); trigger_queue_lock->unlock();
} }
@ -174,9 +173,9 @@ void IntervalBasedTriggerHost::tick() {
const auto current_time = std::chrono::high_resolution_clock::now().time_since_epoch().count(); const auto current_time = std::chrono::high_resolution_clock::now().time_since_epoch().count();
const auto delta_t = static_cast<uint32_t>((current_time - now) / 1000000); // miliseconds precision const auto delta_t = static_cast<uint32_t>((current_time - now) / 1000000); // miliseconds precision
now = current_time; now = current_time;
auto vt_triggers = static_cast<aq_map<std::string, IntervalBasedTrigger> *>(this->triggers); auto vt_triggers = static_cast<vector_type<IntervalBasedTrigger> *>(this->triggers);
trigger_queue_lock->lock(); trigger_queue_lock->lock();
for(auto& [_, t] : vt_triggers->values()) { for(auto& t : *vt_triggers) {
if(t.tick(delta_t)) { if(t.tick(delta_t)) {
payload_t payload; payload_t payload;
payload.f = execTriggerPayload; payload.f = execTriggerPayload;
@ -188,11 +187,6 @@ void IntervalBasedTriggerHost::tick() {
trigger_queue_lock->unlock(); trigger_queue_lock->unlock();
} }
void IntervalBasedTriggerHost::remove_trigger(const char* name) {
auto vt_triggers = static_cast<aq_map<std::string, IntervalBasedTrigger> *>(this->triggers);
vt_triggers->erase(name);
}
void IntervalBasedTrigger::reset() { void IntervalBasedTrigger::reset() {
time_remaining = interval; time_remaining = interval;
} }
@ -207,9 +201,3 @@ bool IntervalBasedTrigger::tick(uint32_t delta_t) {
time_remaining = time_remaining - curr_dt; time_remaining = time_remaining - curr_dt;
return ret; return ret;
} }
CallbackBasedTriggerHost::CallbackBasedTriggerHost(ThreadPool *tp) {
this->tp = tp;
}
void CallbackBasedTriggerHost::tick() {}

@ -2,8 +2,6 @@
#define _AQ_THREADING_H #define _AQ_THREADING_H
#include <stdint.h> #include <stdint.h>
#include <thread>
#include <mutex>
typedef int(*payload_fn_t)(void*); typedef int(*payload_fn_t)(void*);
struct payload_t{ struct payload_t{
@ -41,11 +39,12 @@ private:
}; };
#include <thread>
#include <mutex>
class A_Semphore; class A_Semphore;
class TriggerHost { class TriggerHost {
public: protected:
void* triggers; void* triggers;
std::thread* handle; std::thread* handle;
ThreadPool *tp; ThreadPool *tp;
@ -53,15 +52,14 @@ public:
std::mutex* trigger_queue_lock; std::mutex* trigger_queue_lock;
virtual void tick() = 0; virtual void tick() = 0;
public:
TriggerHost() = default; TriggerHost() = default;
virtual ~TriggerHost() = default; virtual ~TriggerHost() = default;
}; };
struct StoredProcedure; struct StoredProcedure;
struct Trigger{}; struct IntervalBasedTrigger {
struct IntervalBasedTrigger : Trigger {
uint32_t interval; // in milliseconds uint32_t interval; // in milliseconds
uint32_t time_remaining; uint32_t time_remaining;
StoredProcedure* sp; StoredProcedure* sp;
@ -72,8 +70,8 @@ struct IntervalBasedTrigger : Trigger {
class IntervalBasedTriggerHost : public TriggerHost { class IntervalBasedTriggerHost : public TriggerHost {
public: public:
explicit IntervalBasedTriggerHost(ThreadPool *tp); explicit IntervalBasedTriggerHost(ThreadPool *tp);
void add_trigger(const char* name, StoredProcedure* stored_procedure, uint32_t interval); void add_trigger(StoredProcedure* stored_procedure, uint32_t interval);
void remove_trigger(const char* name); void remove_trigger(uint32_t tid);
private: private:
unsigned long long now; unsigned long long now;
void tick() override; void tick() override;
@ -81,7 +79,6 @@ private:
class CallbackBasedTriggerHost : public TriggerHost { class CallbackBasedTriggerHost : public TriggerHost {
public: public:
explicit CallbackBasedTriggerHost(ThreadPool *tp);
void add_trigger(); void add_trigger();
private: private:
void tick() override; void tick() override;

@ -18,7 +18,7 @@
#include "gc.h" #include "gc.h"
#pragma pack(push, 1) #pragma pack(push, 1)
struct vectortype_cstorage { struct vectortype_cstorage{
void* container; void* container;
unsigned int size, capacity; unsigned int size, capacity;
}; };
@ -123,7 +123,7 @@ public:
_copy(vt); _copy(vt);
return *this; return *this;
} }
vector_type<_Ty>& operator =(vector_type<_Ty>&& vt) noexcept { vector_type<_Ty>& operator =(vector_type<_Ty>&& vt) {
_move(std::move(vt)); _move(std::move(vt));
return *this; return *this;
} }
@ -139,10 +139,10 @@ public:
return *this; return *this;
} }
inline std::unordered_set<value_t> distinct_common() { inline std::unordered_set<value_t> distinct_common(){
return std::unordered_set<value_t>(container, container + size); return std::unordered_set<value_t>(container, container + size);
} }
vector_type<_Ty>& distinct_inplace() { vector_type<_Ty>& distinct_inplace(){
uint32_t i = 0; uint32_t i = 0;
for(const auto& v : distinct_common()){ for(const auto& v : distinct_common()){
container[i++] = v; container[i++] = v;

Loading…
Cancel
Save