From 4321422b4f088981a11a1c20a41bb60510871ddf Mon Sep 17 00:00:00 2001 From: Bill Date: Sat, 3 Sep 2022 18:17:51 +0800 Subject: [PATCH] added user module --- .gitignore | 2 +- Makefile | 2 +- README.md | 8 ++++++-- aquery_config.py | 2 +- prompt.py | 17 +++++++++++------ reconstruct/ast.py | 3 ++- reconstruct/storage.py | 9 +++++++-- sdk/aquery.h | 12 ++++-------- sdk/aquery_mem.cpp | 14 +++++++------- sdk/example.cpp | 10 ++++++++++ server/server.cpp | 6 ++++++ test.aquery | 4 ++++ 12 files changed, 60 insertions(+), 29 deletions(-) create mode 100644 sdk/example.cpp diff --git a/.gitignore b/.gitignore index fa039f6..43c1c46 100644 --- a/.gitignore +++ b/.gitignore @@ -54,6 +54,6 @@ saves *.exe out*.cpp udf*.hpp - +*.ipynb diff --git a/Makefile b/Makefile index cbba0a9..cdb55dc 100644 --- a/Makefile +++ b/Makefile @@ -38,7 +38,7 @@ server.so: # $(CXX) server/server.cpp server/monetdb_conn.cpp -fPIC -shared $(OS_SUPPORT) monetdb/msvc/monetdbe.dll --std=c++1z -O3 -march=native -o server.so -I./monetdb/msvc $(CXX) $(SHAREDFLAGS) server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) server/monetdb_conn.cpp $(Threading) $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o server.so snippet: - $(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp server/monetdb_conn.cpp server/table.cpp server/io.cpp $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o dll.so + $(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp server/server.cpp server/monetdb_conn.cpp server/table.cpp server/io.cpp $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o dll.so docker: docker build -t aquery . diff --git a/README.md b/README.md index 504c820..1faa59f 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ + # AQuery++ Database ## Introduction @@ -39,10 +40,13 @@ AQuery++ Database is a cross-platform, In-Memory Column-Store Database that inco ## TODO: -- [ ] User Module load syntax parsing (fn definition/registration) +- [x] User Module load syntax parsing (fn definition/registration) +- [ ] User Module initialize location - [ ] User Module test - [ ] Interval based triggers -- [ ] Bug fixes: type deduction misaligned in Hybrid Engine +- [ ] Optimize Compilation Process, using static libraries, hot reloading server binary +- [x] Bug fixes: type deduction misaligned in Hybrid Engine +- [ ] Investigation: Using postproc only for q1 in Hybrid Engine (make is_special always on) - [ ] Limitation: putting ColRefs back to monetdb. - [ ] C++ Meta-Programming: Eliminate template recursions as much as possible. diff --git a/aquery_config.py b/aquery_config.py index 462232f..6f86395 100644 --- a/aquery_config.py +++ b/aquery_config.py @@ -1,7 +1,7 @@ # put environment specific configuration here ## GLOBAL CONFIGURATION FLAGS -version_string = '0.4.0a' +version_string = '0.4.1a' add_path_to_ldpath = True rebuild_backend = True run_backend = True diff --git a/prompt.py b/prompt.py index d0064e8..5209cf4 100644 --- a/prompt.py +++ b/prompt.py @@ -351,6 +351,12 @@ def prompt(running = lambda:True, next = input, state = None): elif q.startswith('xexec'): # generate build and run (MonetDB Engine) state.cfg.backend_type = Backend_Type.BACKEND_MonetDB.value cxt = xengine.exec(state.stmts, cxt, keep) + + this_udf = cxt.finalize_udf() + if this_udf: + with open('udf.hpp', 'wb') as outfile: + outfile.write(this_udf.encode('utf-8')) + if state.server_mode == RunType.Threaded: # assignment to avoid auto gc # sqls = [s.strip() for s in cxt.sql.split(';')] @@ -362,11 +368,7 @@ def prompt(running = lambda:True, next = input, state = None): state.send(sz, payload) except TypeError as e: print(e) - this_udf = cxt.finalize_udf() - - if this_udf: - with open('udf.hpp', 'wb') as outfile: - outfile.write(this_udf.encode('utf-8')) + qs = re.split(r'[ \t]', q) build_this = not(len(qs) > 1 and qs[1].startswith('n')) if cxt.has_dll: @@ -472,11 +474,14 @@ def prompt(running = lambda:True, next = input, state = None): raise except BaseException as e: import code, traceback + raise_exception = True sh = code.InteractiveConsole({**globals(), **locals()}) sh.interact(banner = traceback.format_exc(), exitmsg = 'debugging session ended.') save('', cxt) rm(state) - raise e + # control whether to raise exception in interactive console + if raise_exception: + raise e rm(state) ## FUNCTIONS END diff --git a/reconstruct/ast.py b/reconstruct/ast.py index 283f2a1..63f4aa2 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -659,7 +659,8 @@ class load(ast_node): self.context.module_map[fname] = cpp_stub #registration for parser self.functions[fname] = user_module_function(fname, nargs, ret_type, self.context) - + self.context.module_init_loc = len(self.context.queries) + def produce_aq(self, node): node = node['load'] s1 = 'LOAD DATA INFILE ' diff --git a/reconstruct/storage.py b/reconstruct/storage.py index e73bf40..d387855 100644 --- a/reconstruct/storage.py +++ b/reconstruct/storage.py @@ -95,6 +95,7 @@ class Context: self.scans = [] self.procs = [] self.queries = [] + self.module_init_loc = 0 def __init__(self): self.tables_byname = dict() @@ -143,14 +144,18 @@ class Context: ret = '__AQEXPORT__(void) __builtin_init_user_module(Context* cxt){\n' for fname in self.module_map.keys(): ret += f'{fname} = (decltype({fname}))(cxt->get_module_function("{fname}"));\n' - self.queries.insert(0, f'P__builtin_init_user_module') + self.queries.insert(self.module_init_loc, 'P__builtin_init_user_module') return ret + '}\n' def sql_begin(self): self.sql = '' def sql_end(self): - if self.sql.strip(): + # eliminate empty queries + s = self.sql.strip() + while(s and s[-1] == ';'): + s = s[:-1].strip() + if s and s.lower() != 'select': self.queries.append('Q' + self.sql) self.sql = '' diff --git a/sdk/aquery.h b/sdk/aquery.h index 28ef087..bc9ef43 100644 --- a/sdk/aquery.h +++ b/sdk/aquery.h @@ -39,10 +39,6 @@ struct Context{ Session current; -#ifdef THREADING - void* thread_pool; -#endif - printf_type print = printf; template void log(Types... args) { @@ -57,7 +53,7 @@ struct Context{ void init_session(); void end_session(); void* get_module_function(const char*); - char remainder[]; + char remainder[]; }; #ifdef _WIN32 @@ -68,10 +64,10 @@ struct Context{ #define __AQEXPORT__(_Ty) extern "C" _Ty __DLLEXPORT__ -typedef void (*dealloctor_t) (void*); +typedef void (*deallocator_t) (void*); -extern void* Aalloc(size_t sz); +extern void* Aalloc(unsigned long long sz); extern void Afree(void * mem); -extern size_t register_memory(void* ptr, dealloctor_t deallocator); +extern void register_memory(void* ptr, deallocator_t deallocator); #endif \ No newline at end of file diff --git a/sdk/aquery_mem.cpp b/sdk/aquery_mem.cpp index bed5949..0288af3 100644 --- a/sdk/aquery_mem.cpp +++ b/sdk/aquery_mem.cpp @@ -1,7 +1,7 @@ #include "aquery.h" #include -#include +#include #include Session* session; @@ -9,19 +9,19 @@ Session* session; void* Aalloc(size_t sz, deallocator_t deallocator){ void* mem = malloc(sz); - auto memmap = (std::unordered_map*) session->memory_map; - memmap[mem] = deallocator; + auto memmap = (std::unordered_map*) session->memory_map; + memmap->operator[](mem) = deallocator; return mem; } void Afree(void* mem){ - auto memmap = (std::unordered_map*) session->memory_map; - memmap[mem](mem); + auto memmap = (std::unordered_map*) session->memory_map; + memmap->operator[](mem)(mem); memmap->erase(mem); } void register_memory(void* ptr, deallocator_t deallocator){ - auto memmap = (std::unordered_map*) session->memory_map; - memmap[ptr] = deallocator; + auto memmap = (std::unordered_map*) session->memory_map; + memmap->operator[](ptr) = deallocator; } diff --git a/sdk/example.cpp b/sdk/example.cpp new file mode 100644 index 0000000..a6383ce --- /dev/null +++ b/sdk/example.cpp @@ -0,0 +1,10 @@ +#include "aquery.h" +#include "../server/table.h" + +__AQEXPORT__(ColRef) mulvec(int a, ColRef b){ + return a * b; +} + +__AQEXPORT__(double) mydiv(int a, int b){ + return a / (double)b; +} \ No newline at end of file diff --git a/server/server.cpp b/server/server.cpp index b126232..fcbdbad 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -80,6 +80,10 @@ void Context::init_session(){ void* Context::get_module_function(const char* fname){ auto fmap = static_cast*> (this->module_function_maps); + //printf("%p\n", fmap->find("mydiv")->second); + for (const auto& [key, value] : *fmap){ + printf("%s %p\n", key.c_str(), value); + } auto ret = fmap->find(fname); return ret == fmap->end() ? nullptr : ret->second; } @@ -141,7 +145,9 @@ int dll_main(int argc, char** argv, Context* cxt){ case 'F': { auto fname = n_recvd[i] + 1; + printf("%s: %p, %p\n", fname, user_module_handle, dlsym(user_module_handle, fname)); module_fn_map->insert_or_assign(fname, dlsym(user_module_handle, fname)); + printf("%p\n", module_fn_map->find("mydiv") != module_fn_map->end() ? module_fn_map->find("mydiv")->second : nullptr); } break; case 'U': diff --git a/test.aquery b/test.aquery index 1c4b847..71f5b97 100644 --- a/test.aquery +++ b/test.aquery @@ -28,4 +28,8 @@ echo Testing UDF with calls to other UDFs f funcs.a xexec +echo Testing User Modules +f modules.a +xexec + exit