From 6859671230c9206cd9e220f4dc83069b6ddd737e Mon Sep 17 00:00:00 2001 From: Bill Date: Fri, 9 Sep 2022 22:15:55 +0800 Subject: [PATCH] Added session manager. --- Makefile | 4 +-- README.md | 18 ++++++------ aquery_config.py | 3 +- prompt.py | 5 ++-- reconstruct/expr.py | 8 ++++-- sdk/aquery.h | 2 ++ sdk/aquery_mem.cpp | 4 +++ sdk/example.cpp | 3 +- server/libaquery.h | 7 +++-- server/monetdb_conn.cpp | 4 +-- server/server.cpp | 61 +++++++++++++++++++++++++++++++---------- server/vector_type.hpp | 4 +-- 12 files changed, 84 insertions(+), 39 deletions(-) diff --git a/Makefile b/Makefile index 10ef0a7..88d2a94 100644 --- a/Makefile +++ b/Makefile @@ -5,9 +5,9 @@ CXXFLAGS = --std=c++1z OPTFLAGS = -O3 -flto -march=native SHAREDFLAGS = -shared -fPIC ifeq ($(PCH), 1) -PCHFLAGS = -include server/aggregations.h + PCHFLAGS = -include server/aggregations.h else -PCHFLAGS = + PCHFLAGS = endif ifeq ($(OS),Windows_NT) diff --git a/README.md b/README.md index d42a5f5..7176e32 100644 --- a/README.md +++ b/README.md @@ -30,24 +30,24 @@ AQuery++ Database is a cross-platform, In-Memory Column-Store Database that inco - [x] Assumption - [x] Flatten - [x] UDFs (Hybrid Engine only) - - [ ] User Module + - [x] User Module - [ ] Triggers - [x] Join (Hybrid Engine only) - [ ] Subqueries -- [ ] Query Optimization +- [x] Query Optimization - [x] Selection/Order by push-down - - [ ] Join Optimization + - [x] Join Optimization (Only in Hybrid Engine) ## TODO: - [x] User Module load syntax parsing (fn definition/registration) -- [ ] User Module initialize location -- [ ] User Module test -- [ ] Interval based triggers -- [ ] Optimize Compilation Process, using static libraries, hot reloading server binary +- [x] User Module initialize location + -> User Module test + -> Interval based triggers + -> Optimize Compilation Process, using static libraries, hot reloading server binary - [x] Bug fixes: type deduction misaligned in Hybrid Engine -- [ ] Investigation: Using postproc only for q1 in Hybrid Engine (make is_special always on) -- [ ] Limitation: putting ColRefs back to monetdb. + -> Investigation: Using postproc only for q1 in Hybrid Engine (make is_special always on) +- [x] Limitation: putting ColRefs back to monetdb. - [ ] Limitation: String operations and Date/Time data type. - [ ] C++ Meta-Programming: Eliminate template recursions as much as possible. diff --git a/aquery_config.py b/aquery_config.py index e3d9f19..1731540 100644 --- a/aquery_config.py +++ b/aquery_config.py @@ -1,7 +1,7 @@ # put environment specific configuration here ## GLOBAL CONFIGURATION FLAGS -version_string = '0.4.2a' +version_string = '0.4.3a' add_path_to_ldpath = True rebuild_backend = False run_backend = True @@ -58,4 +58,3 @@ def init_config(): if os_platform == 'cygwin': add_dll_dir('./lib') __config_initialized__ = True - \ No newline at end of file diff --git a/prompt.py b/prompt.py index 0e536ac..46a2e69 100644 --- a/prompt.py +++ b/prompt.py @@ -1,4 +1,3 @@ -from multiprocessing.sharedctypes import Value import aquery_config help_message = '''\ ====================================================== @@ -122,13 +121,15 @@ class Backend_Type(enum.Enum): class Config: __all_attrs__ = ['running', 'new_query', 'server_mode', 'backend_type', 'has_dll', 'n_buffers'] __init_attributes__ = False + + @staticmethod def __init_self__(): if not Config.__init_attributes__: from functools import partial for _i, attr in enumerate(Config.__all_attrs__): if not hasattr(Config, attr): setattr(Config, attr, property(partial(Config.getter, i = _i), partial(Config.setter, i = _i))) - __init_attributes__ = True + Config.__init_attributes__ = True def __init__(self, mode, nq = 0, n_bufs = 0, bf_szs = []) -> None: Config.__init_self__() diff --git a/reconstruct/expr.py b/reconstruct/expr.py index c7987aa..65513ac 100644 --- a/reconstruct/expr.py +++ b/reconstruct/expr.py @@ -99,7 +99,9 @@ class expr(ast_node): self.type = op.return_type(*type_vals) except AttributeError as e: if type(self.root) is not udf: - print(f'alert: {e}') + # TODO: do something when this is not an error + # print(f'alert: {e}') + pass self.type = AnyT self.sql = op(self.c_code, *str_vals) @@ -142,7 +144,9 @@ class expr(ast_node): else: return var_table[node] if vec[0] not in _vars: - print(f'Use of undefined variable {vec[0]}') + # print(f'Use of undefined variable {vec[0]}') + # TODO: do something when this is not an error + pass else: vname = get_vname(vec[0]) val = enlist(val) diff --git a/sdk/aquery.h b/sdk/aquery.h index bc9ef43..8e50517 100644 --- a/sdk/aquery.h +++ b/sdk/aquery.h @@ -69,5 +69,7 @@ typedef void (*deallocator_t) (void*); extern void* Aalloc(unsigned long long sz); extern void Afree(void * mem); extern void register_memory(void* ptr, deallocator_t deallocator); +extern void init_session(Context* cxt); +#define __AQ_NO_SESSION__ void init_session(Context*) {} #endif \ No newline at end of file diff --git a/sdk/aquery_mem.cpp b/sdk/aquery_mem.cpp index 0288af3..60c969e 100644 --- a/sdk/aquery_mem.cpp +++ b/sdk/aquery_mem.cpp @@ -25,3 +25,7 @@ void register_memory(void* ptr, deallocator_t deallocator){ memmap->operator[](ptr) = deallocator; } +void init_session(Context* cxt){ + session = &cxt->current; + // session->memory_map = new std::unordered_map(); +} \ No newline at end of file diff --git a/sdk/example.cpp b/sdk/example.cpp index a6383ce..4ce9715 100644 --- a/sdk/example.cpp +++ b/sdk/example.cpp @@ -1,4 +1,5 @@ #include "aquery.h" +__AQ_NO_SESSION__ #include "../server/table.h" __AQEXPORT__(ColRef) mulvec(int a, ColRef b){ @@ -7,4 +8,4 @@ __AQEXPORT__(ColRef) mulvec(int a, ColRef b){ __AQEXPORT__(double) mydiv(int a, int b){ return a / (double)b; -} \ No newline at end of file +} diff --git a/server/libaquery.h b/server/libaquery.h index b391ab3..bbce22a 100644 --- a/server/libaquery.h +++ b/server/libaquery.h @@ -26,7 +26,7 @@ struct Session{ size_t total_active; size_t cnt_object; size_t total_alloc; - }; + } stats; void* memory_map; }; @@ -48,7 +48,8 @@ struct Context{ void* thread_pool; #endif printf_type print = printf; - + Context(); + virtual ~Context(); template void log(Types... args) { if (log_level == LOG_INFO) @@ -73,4 +74,6 @@ struct Context{ #endif #define __AQEXPORT__(_Ty) extern "C" _Ty __DLLEXPORT__ +typedef void (*deallocator_t) (void*); + #endif diff --git a/server/monetdb_conn.cpp b/server/monetdb_conn.cpp index bd1fb81..34b6f86 100644 --- a/server/monetdb_conn.cpp +++ b/server/monetdb_conn.cpp @@ -127,8 +127,8 @@ void* Server::getCol(int col_idx){ { auto _ret_col = static_cast(this->ret_col); cnt = _ret_col->count; - printf("Dbg: Getting col %s, type: %s\n", - _ret_col->name, monetdbe_type_str[_ret_col->type]); + // printf("Dbg: Getting col %s, type: %s\n", + // _ret_col->name, monetdbe_type_str[_ret_col->type]); return _ret_col->data; } else{ diff --git a/server/server.cpp b/server/server.cpp index 56a1dbe..9436ed0 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -32,14 +32,10 @@ struct SharedMemory }; #endif -struct thread_context{ - -}v; -void daemon(thread_context* c) { - -} #include "aggregations.h" typedef int (*code_snippet)(void*); +typedef void (*module_init_fn)(Context*); + int test_main(); int n_recv = 0; @@ -49,9 +45,11 @@ extern "C" void __DLLEXPORT__ receive_args(int argc, char**argv){ n_recv = argc; n_recvd = argv; } + enum BinaryInfo_t { MSVC, MSYS, GCC, CLANG, AppleClang }; + extern "C" int __DLLEXPORT__ binary_info() { #if defined(_MSC_VER) && !defined (__llvm__) return MSVC; @@ -71,12 +69,30 @@ __AQEXPORT__(bool) have_hge(){ return false; #endif } - +Context::Context() { + current.memory_map = new std::unordered_map; + init_session(); +} +Context::~Context() { + auto memmap = (std::unordered_map*) this->current.memory_map; + delete memmap; +} void Context::init_session(){ if (log_level == LOG_INFO){ - Session::Statistic stats; + memset(&(this->current.stats), 0, sizeof(Session::Statistic)); } + auto memmap = (std::unordered_map*) this->current.memory_map; + memmap->clear(); } + +void Context::end_session(){ + auto memmap = (std::unordered_map*) this->current.memory_map; + for (auto& mem : *memmap) { + mem.second(mem.first); + } + memmap->clear(); +} + void* Context::get_module_function(const char* fname){ auto fmap = static_cast*> (this->module_function_maps); @@ -88,6 +104,16 @@ void* Context::get_module_function(const char* fname){ return ret == fmap->end() ? nullptr : ret->second; } +void initialize_module(const char* module_name, void* module_handle, Context* cxt){ + auto _init_module = reinterpret_cast(dlsym(module_handle, "init_session")); + if (_init_module) { + _init_module(cxt); + } + else { + printf("Warning: module %s have no session support.\n", module_name); + } +} + int dll_main(int argc, char** argv, Context* cxt){ Config *cfg = reinterpret_cast(argv[0]); std::unordered_map user_module_map; @@ -119,30 +145,35 @@ int dll_main(int argc, char** argv, Context* cxt){ if (cfg->backend_type == BACKEND_AQuery || cfg->has_dll) { handle = dlopen("./dll.so", RTLD_LAZY); } + for (const auto& module : user_module_map){ + initialize_module(module.first.c_str(), module.second, cxt); + } + cxt->init_session(); for(int i = 0; i < n_recv; ++i) { //printf("%s, %d\n", n_recvd[i], n_recvd[i][0] == 'Q'); switch(n_recvd[i][0]){ - case 'Q': + case 'Q': // SQL query for monetdbe { server->exec(n_recvd[i] + 1); printf("Exec Q%d: %s", i, n_recvd[i]); } break; - case 'P': + case 'P': // Postprocessing procedure if(handle && !server->haserror()) { code_snippet c = reinterpret_cast(dlsym(handle, n_recvd[i]+1)); c(cxt); } break; - case 'M': + case 'M': // Load Module { auto mname = n_recvd[i] + 1; user_module_handle = dlopen(mname, RTLD_LAZY); user_module_map[mname] = user_module_handle; + initialize_module(mname, user_module_handle, cxt); } break; - case 'F': + case 'F': // Register Function in Module { auto fname = n_recvd[i] + 1; //printf("%s: %p, %p\n", fname, user_module_handle, dlsym(user_module_handle, fname)); @@ -150,7 +181,7 @@ int dll_main(int argc, char** argv, Context* cxt){ //printf("%p\n", module_fn_map->find("mydiv") != module_fn_map->end() ? module_fn_map->find("mydiv")->second : nullptr); } break; - case 'U': + case 'U': // Unload Module { auto mname = n_recvd[i] + 1; auto it = user_module_map.find(mname); @@ -160,17 +191,17 @@ int dll_main(int argc, char** argv, Context* cxt){ user_module_map.erase(it); } break; - } } if(handle) { dlclose(handle); handle = 0; } + cxt->end_session(); n_recv = 0; } if(server->last_error == nullptr){ - + // TODO: Add feedback to prompt. } else{ server->last_error = nullptr; diff --git a/server/vector_type.hpp b/server/vector_type.hpp index e782c93..54b9bcb 100644 --- a/server/vector_type.hpp +++ b/server/vector_type.hpp @@ -30,7 +30,7 @@ public: this->size = vt.size; this->capacity = vt.capacity; if (capacity) { - puts("copy"); + // puts("copy"); this->container = (_Ty*)malloc(size * sizeof(_Ty)); memcpy(container, vt.container, sizeof(_Ty) * size); } @@ -44,7 +44,7 @@ public: this->size = vt.size; this->capacity = vt.capacity; this->container = vt.container; - puts("move"); + // puts("move"); vt.size = vt.capacity = 0; vt.container = 0; }