Added documentation for trigger demo

master
Bill 2 years ago
parent c944b5dfcf
commit 726ef535ea

@ -1,4 +1,7 @@
# AQuery++ Database # AQuery++ Database
## News:
**Demo workflow for Triggers now available** See [**DEMO**](/demo/README.md)
## Introduction ## Introduction
AQuery++ Database is a cross-platform, In-Memory Column-Store Database that incorporates compiled query execution. (**Note**: If you encounter any problems, feel free to contact me via ys3540@nyu.edu) AQuery++ Database is a cross-platform, In-Memory Column-Store Database that incorporates compiled query execution. (**Note**: If you encounter any problems, feel free to contact me via ys3540@nyu.edu)

@ -27,6 +27,7 @@ class checksums:
aquery_config.build_driver + aquery_config.build_driver +
compiler_name + compiler_name +
aquery_config.version_string aquery_config.version_string
+ str(os.environ['AQ_DEBUG'] == '1')
) )
for key in self.__dict__.keys(): for key in self.__dict__.keys():
try: try:

@ -0,0 +1,28 @@
# Triggers Demo
This folder contains a demo workflow for the two types of triggers.
- An interval-based trigger will be set up to execute a stored procedure `demoi` defined in [demo/putdata.cpp](/demo/putdata.cpp) that inserts a .csv file from `data/electricity` to the table `source` every 5 seconds.
- A Conditional trigger will be triggered by condition `democq` defined in [demo/query.cpp](/demo/query.cpp) that checks and returns true when more than 200 rows of data are inserted into table `source`. Once triggered, it will execute a stored procedure `democa` defined in [demo/democa.cpp](/demo/action.cpp) that trains the incremental random forest by the new data.
- See [demo/prep.a](/demo/prep.a) for parameters of the random forest.
## Run the demo
### Preparation
- Preprocess data
- Put `electricity` dataset to `/data/electricity_orig`
- Run `python3 rfdata_preproc.py` to generate .csv files to `data/electricity/`
- Use [demo/setup.sh](/demo/setup.sh) to
- setup stored procedures for this demo
- compile random forest user module used in this demo
- compile queries used in this demo
### Running the demo
- Run AQuery prompt `python3 prompt.py`
- Use Automated AQuery script in [demo/demo.aquery](/demo/demo.aquery) to execute the workflow. It does the following things in order:
- Register user module, create a new random forest by running [`f demo/prep.a`](/demo/prep.a)
- Register stored procedures.
- Create an Interval-based Trigger that executes payload `demoi` every 5 seconds
- Create a Conditional Trigger that executes payload `democa` whenever condition `democq` returns a true. While condition `democq` is tested every time new data is inserted to table `source`.
- Loads test data by running [demo/test.a](/demo/test.a)
- Use query `select predict(x) from test` to get predictions of the test data from current random forest.
- In AQuery prompt, an extra `exec` command after the query is needed to execute the query.
- Use query `select test(x, y) from test` will also calculate l2 error.

@ -3,15 +3,15 @@
f demo/prep.a f demo/prep.a
exec exec
procedure demoi run procedure demoi load
procedure democq run procedure democq load
procedure democa run procedure democa load
create trigger t action demoi interval 15000 create trigger t action demoi interval 5000
exec exec
create trigger c on source action democa when democq create trigger c on source action democa when democq
exec exec
# f demo/test.a f demo/test.a
# exec exec

@ -1,13 +1,14 @@
create table source(x vecdouble, y int64); create table source(x vecdouble, y int64);
LOAD MODULE FROM "./libirf.so" FUNCTIONS ( LOAD MODULE FROM "./libirf.so" FUNCTIONS (
newtree(height:int, f:int64, sparse:vecint, forget:double, maxf:int64, noclasses:int64, e:int) -> bool, newtree(height:int, f:int64, sparse:vecint, forget:double, noclasses:int64, e:int) -> bool,
fit_inc(X:vecvecdouble, y:vecint64) -> bool, fit_inc(X:vecvecdouble, y:vecint64) -> bool,
predict(X:vecvecdouble) -> vecint predict(X:vecvecdouble) -> vecint ,
test(X:vecvecdouble, y:vecint64) -> double
); );
create table elec_sparse(v int); create table elec_sparse(v int);
insert into elec_sparse values (0), (1), (1), (1), (1), (1), (1); insert into elec_sparse values (0), (1), (1), (1), (1), (1), (1);
select newtree(30, 7, elec_sparse.v, 0, 4, 2, 1) from elec_sparse select newtree(30, 7, elec_sparse.v, 0.3, 2, 1) from elec_sparse

@ -0,0 +1,6 @@
#!/bin/sh
make -C ../sdk irf
mkdir ../procedures
cp demo*.aqp ../procedures
make

@ -1,3 +1,7 @@
-- select predict(x) from test;
-- select test(x, y) from test;
create table test(x vecdouble, y int64); create table test(x vecdouble, y int64);
load complex data infile "data/electricity/electricity872.csv" into table test fields terminated by ',' element terminated by ';'; load complex data infile "data/electricity/electricity872.csv" into table test fields terminated by ',' element terminated by ';';
select predict(x) from test

@ -1 +1 @@
Subproject commit a61c3122c43293ff6f8bd01b4f65d7d03c5c4c54 Subproject commit 0d0ec214297a3e46947cead37f887eb480043419

@ -238,7 +238,7 @@ class PromptState():
server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so' server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so'
wait_engine = lambda: None wait_engine = lambda: None
wake_engine = lambda: None wake_engine = lambda: None
get_storedproc = lambda : StoredProcedure() get_storedproc = lambda *_: StoredProcedure()
set_ready = lambda: None set_ready = lambda: None
get_ready = lambda: None get_ready = lambda: None
server_status = lambda: False server_status = lambda: False

@ -1336,11 +1336,11 @@ class load(ast_node):
self.context.postproc_begin(self.postproc_fname) self.context.postproc_begin(self.postproc_fname)
table:TableInfo = self.context.tables_byname[node['table']] table:TableInfo = self.context.tables_byname[node['table']]
self.sql = F"SELECT {', '.join([c.name for c in table.columns])} FROM {table.table_name};" # self.sql = F"SELECT {', '.join([c.name for c in table.columns])} FROM {table.table_name};"
self.emit(self.sql+';\n') # self.emit(self.sql+';\n')
self.context.sql_end() # self.context.sql_end()
length_name = 'len_' + base62uuid(6) # length_name = 'len_' + base62uuid(6)
self.context.emitc(f'auto {length_name} = server->cnt;') # self.context.emitc(f'auto {length_name} = server->cnt;')
out_typenames = [t.type.cname for t in table.columns] out_typenames = [t.type.cname for t in table.columns]
outtable_col_nameslist = ', '.join([f'"{c.name}"' for c in table.columns]) outtable_col_nameslist = ', '.join([f'"{c.name}"' for c in table.columns])
@ -1353,7 +1353,8 @@ class load(ast_node):
for i, c in enumerate(table.columns): for i, c in enumerate(table.columns):
c.cxt_name = 'c_' + base62uuid(6) c.cxt_name = 'c_' + base62uuid(6)
self.context.emitc(f'decltype(auto) {c.cxt_name} = {self.out_table}->get_col<{i}>();') self.context.emitc(f'decltype(auto) {c.cxt_name} = {self.out_table}->get_col<{i}>();')
self.context.emitc(f'{c.cxt_name}.initfrom({length_name}, server->getCol({i}), "{table.columns[i].name}");') self.context.emitc(f'{c.cxt_name}.init("{table.columns[i].name}");')
#self.context.emitc(f'{c.cxt_name}.initfrom({length_name}, server->getCol({i}), "{table.columns[i].name}");')
csv_reader_name = 'csv_reader_' + base62uuid(6) csv_reader_name = 'csv_reader_' + base62uuid(6)
col_types = [c.type.cname for c in table.columns] col_types = [c.type.cname for c in table.columns]
col_tmp_names = ['tmp_'+base62uuid(8) for _ in range(len(table.columns))] col_tmp_names = ['tmp_'+base62uuid(8) for _ in range(len(table.columns))]

@ -13,14 +13,14 @@ enum Backend_Type {
BACKEND_MariaDB BACKEND_MariaDB
}; };
struct Config{ struct Config {
int running, new_query, server_mode, int running, new_query, server_mode,
backend_type, has_dll, n_buffers; backend_type, has_dll, n_buffers;
int buffer_sizes[]; int buffer_sizes[];
}; };
struct Session{ struct Session {
struct Statistic{ struct Statistic {
unsigned long long total_active; unsigned long long total_active;
unsigned long long cnt_object; unsigned long long cnt_object;
unsigned long long total_alloc; unsigned long long total_alloc;
@ -102,7 +102,7 @@ namespace std {
} }
#endif #endif
struct vectortype_storage{ struct vectortype_storage {
void* container = nullptr; void* container = nullptr;
unsigned int size = 0, capacity = 0; unsigned int size = 0, capacity = 0;
vectortype_storage(void* container, unsigned int size, unsigned int capacity) : vectortype_storage(void* container, unsigned int size, unsigned int capacity) :

@ -19,7 +19,7 @@ DecisionTree *dt = nullptr;
RandomForest *rf = nullptr; RandomForest *rf = nullptr;
__AQEXPORT__(bool) __AQEXPORT__(bool)
newtree(int ntree, long f, ColRef<int> sparse, double forget, long maxf, long nclasses, Evaluation e) newtree(int ntree, long f, ColRef<int> sparse, double forget, long nclasses, Evaluation e)
{ {
if (sparse.size != f) if (sparse.size != f)
return false; return false;
@ -27,8 +27,6 @@ newtree(int ntree, long f, ColRef<int> sparse, double forget, long maxf, long nc
memcpy(X_cpy, sparse.container, f); memcpy(X_cpy, sparse.container, f);
if (maxf < 0)
maxf = f;
// dt = new DecisionTree(f, X_cpy, forget, maxf, noclasses, e); // dt = new DecisionTree(f, X_cpy, forget, maxf, noclasses, e);
rf = new RandomForest(ntree, f, X_cpy, forget, nclasses, e, true); rf = new RandomForest(ntree, f, X_cpy, forget, nclasses, e, true);
return true; return true;
@ -100,3 +98,22 @@ predict(vector_type<vector_type<double>> v)
auto ret = vectortype_cstorage{.container = container, .size = 1, .capacity = 0}; auto ret = vectortype_cstorage{.container = container, .size = 1, .capacity = 0};
return ret; return ret;
} }
template <typename T>
constexpr T sq(const T x) {
return x * x;
}
__AQEXPORT__(double)
test(vector_type<vector_type<double>> x, vector_type<long> y) {
int result = 0;
printf("y_hat = (");
double err = 0.;
for (uint32_t i = 0; i < x.size; i++) {
//result[i] = dt->Test(v.container[i].container, dt->DTree);
result = int(rf->Test(x[i].container));
err += sq(result - y.container[i]);
printf("%d ", result);
}
puts(")");
printf("error: %lf\n", err/=double(x.size));
return err;
}

@ -460,6 +460,7 @@ start:
case 'S': // save procedure case 'S': // save procedure
break; break;
case 'L': // load procedure case 'L': // load procedure
current_procedure.name = copy_lpstr(proc_name);
if (!load_proc_fromfile(current_procedure)) { if (!load_proc_fromfile(current_procedure)) {
cxt->stored_proc.insert_or_assign(proc_name, current_procedure); cxt->stored_proc.insert_or_assign(proc_name, current_procedure);
} }

@ -42,14 +42,14 @@ void TableInfo<Ts ...>::monetdb_append_table(void* srv, const char* alt_name) {
uint32_t i = 0; uint32_t i = 0;
constexpr auto n_vecs = count_vector_type((tuple_type*)(0)); constexpr auto n_vecs = count_vector_type((tuple_type*)(0));
void* gc_vecs[1 + n_vecs]; void* gc_vecs[1 + n_vecs];
puts("getcols..."); // puts("getcols...");
uint32_t cnt = 0; uint32_t cnt = 0;
const auto get_col = [&monetdbe_cols, &i, *this, &gc_vecs, &cnt](auto v) { const auto get_col = [&monetdbe_cols, &i, *this, &gc_vecs, &cnt](auto v) {
// printf("%d %d\n", i, (ColRef<void>*)v - colrefs); // printf("%d %d\n", i, (ColRef<void>*)v - colrefs);
monetdbe_cols[i++] = (monetdbe_column*)v->monetdb_get_col(gc_vecs, cnt); monetdbe_cols[i++] = (monetdbe_column*)v->monetdb_get_col(gc_vecs, cnt);
}; };
(get_col((ColRef<Ts>*)(colrefs + i)), ...); (get_col((ColRef<Ts>*)(colrefs + i)), ...);
puts("getcols done"); //puts("getcols done");
// for(int i = 0; i < sizeof...(Ts); ++i) // for(int i = 0; i < sizeof...(Ts); ++i)
// { // {
// printf("no:%d name: %s count:%d data: %p type:%d \n", // printf("no:%d name: %s count:%d data: %p type:%d \n",
@ -68,8 +68,8 @@ void TableInfo<Ts ...>::monetdb_append_table(void* srv, const char* alt_name) {
if (last_comma != static_cast<decltype(last_comma)>(-1)) { if (last_comma != static_cast<decltype(last_comma)>(-1)) {
create_table_str[last_comma] = ')'; create_table_str[last_comma] = ')';
Server* server = (Server*)srv; Server* server = (Server*)srv;
puts("create table..."); // puts("create table...");
puts(create_table_str.c_str()); // puts(create_table_str.c_str());
server->exec(create_table_str.c_str()); server->exec(create_table_str.c_str());
if (!server->last_error) { if (!server->last_error) {
auto err = monetdbe_append(*((monetdbe_database*)server->server), "sys", alt_name, monetdbe_cols, sizeof...(Ts)); auto err = monetdbe_append(*((monetdbe_database*)server->server), "sys", alt_name, monetdbe_cols, sizeof...(Ts));

Loading…
Cancel
Save