diff --git a/README.md b/README.md index 1ee0e28..5225d95 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,7 @@ # AQuery++ Database +## News: +**Demo workflow for Triggers now available** See [**DEMO**](/demo/README.md) + ## Introduction AQuery++ Database is a cross-platform, In-Memory Column-Store Database that incorporates compiled query execution. (**Note**: If you encounter any problems, feel free to contact me via ys3540@nyu.edu) diff --git a/build.py b/build.py index a0de64b..341208b 100644 --- a/build.py +++ b/build.py @@ -27,6 +27,7 @@ class checksums: aquery_config.build_driver + compiler_name + aquery_config.version_string + + str(os.environ['AQ_DEBUG'] == '1') ) for key in self.__dict__.keys(): try: diff --git a/demo/README.md b/demo/README.md new file mode 100644 index 0000000..bb8296f --- /dev/null +++ b/demo/README.md @@ -0,0 +1,28 @@ +# Triggers Demo + +This folder contains a demo workflow for the two types of triggers. +- An interval-based trigger will be set up to execute a stored procedure `demoi` defined in [demo/putdata.cpp](/demo/putdata.cpp) that inserts a .csv file from `data/electricity` to the table `source` every 5 seconds. +- A Conditional trigger will be triggered by condition `democq` defined in [demo/query.cpp](/demo/query.cpp) that checks and returns true when more than 200 rows of data are inserted into table `source`. Once triggered, it will execute a stored procedure `democa` defined in [demo/democa.cpp](/demo/action.cpp) that trains the incremental random forest by the new data. +- See [demo/prep.a](/demo/prep.a) for parameters of the random forest. + +## Run the demo +### Preparation +- Preprocess data + - Put `electricity` dataset to `/data/electricity_orig` + - Run `python3 rfdata_preproc.py` to generate .csv files to `data/electricity/` +- Use [demo/setup.sh](/demo/setup.sh) to + - setup stored procedures for this demo + - compile random forest user module used in this demo + - compile queries used in this demo + +### Running the demo +- Run AQuery prompt `python3 prompt.py` +- Use Automated AQuery script in [demo/demo.aquery](/demo/demo.aquery) to execute the workflow. It does the following things in order: + - Register user module, create a new random forest by running [`f demo/prep.a`](/demo/prep.a) + - Register stored procedures. + - Create an Interval-based Trigger that executes payload `demoi` every 5 seconds + - Create a Conditional Trigger that executes payload `democa` whenever condition `democq` returns a true. While condition `democq` is tested every time new data is inserted to table `source`. + - Loads test data by running [demo/test.a](/demo/test.a) + - Use query `select predict(x) from test` to get predictions of the test data from current random forest. + - In AQuery prompt, an extra `exec` command after the query is needed to execute the query. + - Use query `select test(x, y) from test` will also calculate l2 error. diff --git a/demo/demo.aquery b/demo/demo.aquery index c0db643..bc9bd0a 100644 --- a/demo/demo.aquery +++ b/demo/demo.aquery @@ -3,15 +3,15 @@ f demo/prep.a exec -procedure demoi run -procedure democq run -procedure democa run +procedure demoi load +procedure democq load +procedure democa load -create trigger t action demoi interval 15000 +create trigger t action demoi interval 5000 exec create trigger c on source action democa when democq exec -# f demo/test.a -# exec +f demo/test.a +exec diff --git a/demo/prep.a b/demo/prep.a index ccaf7f6..6af3a27 100644 --- a/demo/prep.a +++ b/demo/prep.a @@ -1,13 +1,14 @@ create table source(x vecdouble, y int64); LOAD MODULE FROM "./libirf.so" FUNCTIONS ( - newtree(height:int, f:int64, sparse:vecint, forget:double, maxf:int64, noclasses:int64, e:int) -> bool, + newtree(height:int, f:int64, sparse:vecint, forget:double, noclasses:int64, e:int) -> bool, fit_inc(X:vecvecdouble, y:vecint64) -> bool, - predict(X:vecvecdouble) -> vecint + predict(X:vecvecdouble) -> vecint , + test(X:vecvecdouble, y:vecint64) -> double ); create table elec_sparse(v int); insert into elec_sparse values (0), (1), (1), (1), (1), (1), (1); -select newtree(30, 7, elec_sparse.v, 0, 4, 2, 1) from elec_sparse +select newtree(30, 7, elec_sparse.v, 0.3, 2, 1) from elec_sparse diff --git a/demo/setup.sh b/demo/setup.sh new file mode 100644 index 0000000..0cacfb0 --- /dev/null +++ b/demo/setup.sh @@ -0,0 +1,6 @@ +#!/bin/sh + +make -C ../sdk irf +mkdir ../procedures +cp demo*.aqp ../procedures +make diff --git a/demo/test.a b/demo/test.a index 7301977..4871b4c 100644 --- a/demo/test.a +++ b/demo/test.a @@ -1,3 +1,7 @@ +-- select predict(x) from test; +-- select test(x, y) from test; + + create table test(x vecdouble, y int64); load complex data infile "data/electricity/electricity872.csv" into table test fields terminated by ',' element terminated by ';'; -select predict(x) from test + diff --git a/paper b/paper index a61c312..0d0ec21 160000 --- a/paper +++ b/paper @@ -1 +1 @@ -Subproject commit a61c3122c43293ff6f8bd01b4f65d7d03c5c4c54 +Subproject commit 0d0ec214297a3e46947cead37f887eb480043419 diff --git a/prompt.py b/prompt.py index 93deb9c..bb9ce6a 100644 --- a/prompt.py +++ b/prompt.py @@ -238,7 +238,7 @@ class PromptState(): server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so' wait_engine = lambda: None wake_engine = lambda: None - get_storedproc = lambda : StoredProcedure() + get_storedproc = lambda *_: StoredProcedure() set_ready = lambda: None get_ready = lambda: None server_status = lambda: False diff --git a/reconstruct/ast.py b/reconstruct/ast.py index 30dfeb5..ad671e4 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -1336,11 +1336,11 @@ class load(ast_node): self.context.postproc_begin(self.postproc_fname) table:TableInfo = self.context.tables_byname[node['table']] - self.sql = F"SELECT {', '.join([c.name for c in table.columns])} FROM {table.table_name};" - self.emit(self.sql+';\n') - self.context.sql_end() - length_name = 'len_' + base62uuid(6) - self.context.emitc(f'auto {length_name} = server->cnt;') + # self.sql = F"SELECT {', '.join([c.name for c in table.columns])} FROM {table.table_name};" + # self.emit(self.sql+';\n') + # self.context.sql_end() + # length_name = 'len_' + base62uuid(6) + # self.context.emitc(f'auto {length_name} = server->cnt;') out_typenames = [t.type.cname for t in table.columns] outtable_col_nameslist = ', '.join([f'"{c.name}"' for c in table.columns]) @@ -1353,7 +1353,8 @@ class load(ast_node): for i, c in enumerate(table.columns): c.cxt_name = 'c_' + base62uuid(6) self.context.emitc(f'decltype(auto) {c.cxt_name} = {self.out_table}->get_col<{i}>();') - self.context.emitc(f'{c.cxt_name}.initfrom({length_name}, server->getCol({i}), "{table.columns[i].name}");') + self.context.emitc(f'{c.cxt_name}.init("{table.columns[i].name}");') + #self.context.emitc(f'{c.cxt_name}.initfrom({length_name}, server->getCol({i}), "{table.columns[i].name}");') csv_reader_name = 'csv_reader_' + base62uuid(6) col_types = [c.type.cname for c in table.columns] col_tmp_names = ['tmp_'+base62uuid(8) for _ in range(len(table.columns))] diff --git a/sdk/aquery.h b/sdk/aquery.h index 787eb64..42ce09a 100644 --- a/sdk/aquery.h +++ b/sdk/aquery.h @@ -13,14 +13,14 @@ enum Backend_Type { BACKEND_MariaDB }; -struct Config{ +struct Config { int running, new_query, server_mode, backend_type, has_dll, n_buffers; int buffer_sizes[]; }; -struct Session{ - struct Statistic{ +struct Session { + struct Statistic { unsigned long long total_active; unsigned long long cnt_object; unsigned long long total_alloc; @@ -102,7 +102,7 @@ namespace std { } #endif -struct vectortype_storage{ +struct vectortype_storage { void* container = nullptr; unsigned int size = 0, capacity = 0; vectortype_storage(void* container, unsigned int size, unsigned int capacity) : diff --git a/sdk/irf.cpp b/sdk/irf.cpp index cf5a694..7e969c0 100644 --- a/sdk/irf.cpp +++ b/sdk/irf.cpp @@ -19,7 +19,7 @@ DecisionTree *dt = nullptr; RandomForest *rf = nullptr; __AQEXPORT__(bool) -newtree(int ntree, long f, ColRef sparse, double forget, long maxf, long nclasses, Evaluation e) +newtree(int ntree, long f, ColRef sparse, double forget, long nclasses, Evaluation e) { if (sparse.size != f) return false; @@ -27,8 +27,6 @@ newtree(int ntree, long f, ColRef sparse, double forget, long maxf, long nc memcpy(X_cpy, sparse.container, f); - if (maxf < 0) - maxf = f; // dt = new DecisionTree(f, X_cpy, forget, maxf, noclasses, e); rf = new RandomForest(ntree, f, X_cpy, forget, nclasses, e, true); return true; @@ -100,3 +98,22 @@ predict(vector_type> v) auto ret = vectortype_cstorage{.container = container, .size = 1, .capacity = 0}; return ret; } +template +constexpr T sq(const T x) { + return x * x; +} +__AQEXPORT__(double) +test(vector_type> x, vector_type y) { + int result = 0; + printf("y_hat = ("); + double err = 0.; + for (uint32_t i = 0; i < x.size; i++) { + //result[i] = dt->Test(v.container[i].container, dt->DTree); + result = int(rf->Test(x[i].container)); + err += sq(result - y.container[i]); + printf("%d ", result); + } + puts(")"); + printf("error: %lf\n", err/=double(x.size)); + return err; +} diff --git a/server/server.cpp b/server/server.cpp index d34f1df..608f95b 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -460,6 +460,7 @@ start: case 'S': // save procedure break; case 'L': // load procedure + current_procedure.name = copy_lpstr(proc_name); if (!load_proc_fromfile(current_procedure)) { cxt->stored_proc.insert_or_assign(proc_name, current_procedure); } diff --git a/server/table_ext_monetdb.hpp b/server/table_ext_monetdb.hpp index f4a7571..f688f08 100644 --- a/server/table_ext_monetdb.hpp +++ b/server/table_ext_monetdb.hpp @@ -42,14 +42,14 @@ void TableInfo::monetdb_append_table(void* srv, const char* alt_name) { uint32_t i = 0; constexpr auto n_vecs = count_vector_type((tuple_type*)(0)); void* gc_vecs[1 + n_vecs]; - puts("getcols..."); + // puts("getcols..."); uint32_t cnt = 0; const auto get_col = [&monetdbe_cols, &i, *this, &gc_vecs, &cnt](auto v) { // printf("%d %d\n", i, (ColRef*)v - colrefs); monetdbe_cols[i++] = (monetdbe_column*)v->monetdb_get_col(gc_vecs, cnt); }; (get_col((ColRef*)(colrefs + i)), ...); - puts("getcols done"); + //puts("getcols done"); // for(int i = 0; i < sizeof...(Ts); ++i) // { // printf("no:%d name: %s count:%d data: %p type:%d \n", @@ -68,8 +68,8 @@ void TableInfo::monetdb_append_table(void* srv, const char* alt_name) { if (last_comma != static_cast(-1)) { create_table_str[last_comma] = ')'; Server* server = (Server*)srv; - puts("create table..."); - puts(create_table_str.c_str()); + // puts("create table..."); + // puts(create_table_str.c_str()); server->exec(create_table_str.c_str()); if (!server->last_error) { auto err = monetdbe_append(*((monetdbe_database*)server->server), "sys", alt_name, monetdbe_cols, sizeof...(Ts));