From 5ba333ca20c411f07f350284df9954bfb11411ca Mon Sep 17 00:00:00 2001 From: Bill Date: Mon, 28 Nov 2022 21:31:30 +0800 Subject: [PATCH] test gc --- README.md | 3 ++- server/gc.h | 8 ++++++++ server/libaquery.cpp | 25 ++++++++++++++++++++++--- server/libaquery.h | 9 ++++++++- server/server.cpp | 29 +++++++++++++---------------- 5 files changed, 53 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 7d72430..ef96a71 100644 --- a/README.md +++ b/README.md @@ -304,10 +304,11 @@ SELECT * FROM my_table WHERE c1 > 10 - [x] Query Optimization - [x] Selection/Order by push-down - [x] Join Optimization (Only in Hybrid Engine) - - [ ] Threaded GC + - [x] Threaded GC - [ ] Extensibility - [x] UDFs (Hybrid Engine only) - [x] SDK and User Module + - [x] Stored Procedures - [ ] Triggers # Known Issues: diff --git a/server/gc.h b/server/gc.h index 7bc8d8d..2a63b3d 100644 --- a/server/gc.h +++ b/server/gc.h @@ -16,6 +16,8 @@ private:; std::atomic alive_cnt; std::atomic current_size; volatile bool lock; + using gc_deallocator_t = void (*)(void*); + // maybe use volatile std::thread::id instead protected: void acquire_lock(); @@ -46,6 +48,12 @@ public: terminate_daemon(); } static GC* gc_handle; + template + constexpr static inline gc_deallocator_t _delete(T*){ + return [](void* v){ + delete (T*)v; + }; + } constexpr static void(*_free) (void*) = free; }; diff --git a/server/libaquery.cpp b/server/libaquery.cpp index 93a03da..83abb28 100644 --- a/server/libaquery.cpp +++ b/server/libaquery.cpp @@ -9,6 +9,7 @@ #include "utils.h" #include "libaquery.h" #include +#include "gc.h" char* gbuf = nullptr; @@ -276,10 +277,29 @@ inline const char* str(const bool& v) { return v ? "true" : "false"; } +class A{ + public: + std::chrono::high_resolution_clock::time_point tp; + A(){ + tp = std::chrono::high_resolution_clock::now(); + printf("A %llx created.\n", tp.time_since_epoch().count()); + } + ~A() { + printf("A %llx died after %lldns.\n", tp.time_since_epoch().count(), + (std::chrono::high_resolution_clock::now() - tp).count()); + } +}; Context::Context() { current.memory_map = new std::unordered_map; - init_session(); +#ifndef __AQ_USE_THREADEDGC__ + this->gc = new GC(); +#endif + GC::gc_handle->reg(new A(), 6553600, [](void* a){ + puts("deleting"); + delete ((A*)a); + }); + init_session(); } Context::~Context() { @@ -328,7 +348,6 @@ void* Context::get_module_function(const char* fname){ // std::cout << ')'; // } -#include "gc.h" #include #include #ifndef __AQ_USE_THREADEDGC__ @@ -383,7 +402,7 @@ void GC::daemon() { while (alive) { if (running) { - if (current_size - max_size > 0 || + if (uint64_t(current_size) > max_size || forceclean_timer > forced_clean) { gc(); diff --git a/server/libaquery.h b/server/libaquery.h index e1c7980..c6e51d9 100644 --- a/server/libaquery.h +++ b/server/libaquery.h @@ -73,6 +73,7 @@ struct StoredProcedure{ const char* name; void **__rt_loaded_modules; }; + struct Context{ typedef int (*printf_type) (const char *format, ...); @@ -90,7 +91,7 @@ struct Context{ #ifdef THREADING void* thread_pool; #endif -#ifdef __AQ_THREADED_GC__ +#ifndef __AQ_USE_THREADEDGC__ void* gc; #endif printf_type print = &printf; @@ -162,4 +163,10 @@ template<> char* aq_to_chars(void* , char*); template<> char* aq_to_chars(void* , char*); template<> char* aq_to_chars(void* , char*); typedef int (*code_snippet)(void*); + +template +inline void AQ_ZeroMemory(_This_Struct& __val) { + memset(&__val, 0, sizeof(_This_Struct)); +} + #endif diff --git a/server/server.cpp b/server/server.cpp index c2fac13..f7cf636 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -347,8 +347,8 @@ start: dll_path = proc_name; } proc_name = dll_path.c_str(); - if(recorded_libraries.size) - recorded_queries.emplace_back(copy_lpstr("N")); + //if(recorded_libraries.size) + recorded_queries.emplace_back(copy_lpstr("N")); } handle = dlopen(proc_name, RTLD_NOW); if (procedure_recording) { @@ -442,6 +442,7 @@ start: { if(procedure_module_cursor < current_procedure.postproc_modules) handle = current_procedure.__rt_loaded_modules[procedure_module_cursor++]; + printf("Load %i = %p\n", procedure_module_cursor, handle); } break; case 'R': //recorded procedure @@ -510,10 +511,7 @@ start: switch(n_recvd[i][1]){ case '\0': current_procedure.name = copy_lpstr(proc_name); - current_procedure.cnt = 0; - current_procedure.queries = nullptr; - current_procedure.postproc_modules = 0; - current_procedure.__rt_loaded_modules = nullptr; + AQ_ZeroMemory(current_procedure); procedure_recording = true; procedure_name = proc_name; break; @@ -523,10 +521,8 @@ start: current_procedure.name = copy_lpstr(proc_name); current_procedure.postproc_modules = recorded_libraries.size; current_procedure.__rt_loaded_modules = recorded_libraries.container; - recorded_queries.size = recorded_queries.capacity = 0; - recorded_queries.container = nullptr; - recorded_libraries.size = recorded_libraries.capacity = 0; - recorded_libraries.container = nullptr; + AQ_ZeroMemory(recorded_queries); + AQ_ZeroMemory(recorded_libraries); procedure_recording = false; save_proc_tofile(current_procedure); cxt->stored_proc.insert_or_assign(procedure_name, current_procedure); @@ -534,6 +530,7 @@ start: break; case 'E': // execute procedure { + procedure_module_cursor = 0; auto _proc = cxt->stored_proc.find(proc_name); if (_proc == cxt->stored_proc.end()){ printf("Procedure %s not found. Trying load from disk.\n", proc_name); @@ -571,7 +568,9 @@ start: break; } } - if(handle && procedure_replaying) { + if(handle && + !procedure_replaying && !procedure_recording) { + printf("Destroy %p\n", handle); dlclose(handle); handle = nullptr; } @@ -579,13 +578,11 @@ start: cxt->end_session(); n_recv = 0; } - if(server->last_error == nullptr){ - // TODO: Add feedback to prompt. - } - else{ + if (server->last_error != nullptr) { + printf("Monetdbe Error: %s\n", server->last_error); server->last_error = nullptr; //goto finalize; - } + } } // puts(cfg->has_dll ? "true" : "false");