From 5dad6e5270fe9ba3f7054c7478b0d8de15162ea1 Mon Sep 17 00:00:00 2001 From: Bill Date: Sat, 5 Nov 2022 05:33:28 +0800 Subject: [PATCH] monetdb passthru, WIP: threadedGC --- Makefile | 4 +- aquery_config.py | 2 +- reconstruct/ast.py | 12 +++-- reconstruct/storage.py | 5 +- server/gc.hpp | 102 +++++++++++++++++++++++++++++++++++----- server/monetdb_conn.cpp | 61 +++++++++++++++++++++--- server/monetdb_conn.h | 2 + server/server.cpp | 15 ++++-- 8 files changed, 174 insertions(+), 29 deletions(-) diff --git a/Makefile b/Makefile index 4564f5e..327aab9 100644 --- a/Makefile +++ b/Makefile @@ -83,8 +83,8 @@ ifeq ($(THREADING),1) Defines += -DTHREADING endif -ifeq ($(AQUERY_ITC_USE_SHMEM), 1) - Defines += -D__AQUERY_ITC_USE_SHMEM__ +ifeq ($(AQUERY_ITC_USE_SEMPH), 1) + Defines += -D__AQUERY_ITC_USE_SEMPH__ endif SHAREDFLAGS += $(FPIC) diff --git a/aquery_config.py b/aquery_config.py index e3600f9..9e80e4b 100644 --- a/aquery_config.py +++ b/aquery_config.py @@ -22,7 +22,7 @@ def init_config(): from engine.utils import add_dll_dir # os.environ['CXX'] = 'C:/Program Files/LLVM/bin/clang.exe' os.environ['THREADING'] = '1' - os.environ['AQUERY_ITC_USE_SHMEM'] = '1' + os.environ['AQUERY_ITC_USE_SEMPH'] = '1' if ('__config_initialized__' not in globals() or not __config_initialized__): diff --git a/reconstruct/ast.py b/reconstruct/ast.py index c95223d..0fa978d 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -310,7 +310,6 @@ class projection(ast_node): # cpp module codegen - self.context.has_dll = True # extract typed-columns from result-set vid2cname = [0]*len(self.var_table) self.pyname2cname = dict() @@ -404,15 +403,22 @@ class projection(ast_node): if 'into' in node: self.context.emitc(select_into(self, node['into']).ccode) + self.has_postproc = True if not self.distinct: self.finalize() - + def finalize(self): self.context.emitc(f'puts("done.");') if self.parent is None: self.context.sql_end() - self.context.postproc_end(self.postproc_fname) + if self.has_postproc: + self.context.has_dll = True + self.context.postproc_end(self.postproc_fname) + else: + self.context.ccode = '' + if self.limit != 0: + self.context.direct_output() class select_distinct(projection): first_order = 'select_distinct' diff --git a/reconstruct/storage.py b/reconstruct/storage.py index 98ad799..e8dfe94 100644 --- a/reconstruct/storage.py +++ b/reconstruct/storage.py @@ -249,7 +249,10 @@ class Context: self.sql = '' self.ccode = '' self.finalize_query() - + + def direct_output(self): + self.queries.append('O') + def finalize_udf(self): if self.udf is not None: return (Context.udf_head diff --git a/server/gc.hpp b/server/gc.hpp index 4c66060..decd632 100644 --- a/server/gc.hpp +++ b/server/gc.hpp @@ -3,46 +3,90 @@ #include #include #include +#include +#ifndef __AQ_USE_THREADEDGC__ class GC { +private: template using vector = vector_type; template using tuple = std::tuple; - size_t current_size, max_size, interval, forced_clean; + size_t current_size = 0, max_size, + interval, forced_clean, + forceclean_timer = 0; bool running, alive; // ptr, dealloc, ref, sz - vector> q; + vector> *q, *q_back; std::thread handle; + std::atomic lock; + +protected: + void acquire_lock(){ + auto this_pid = std::this_thread::get_id(); + while(lock != this_pid) + { + while(lock != this_pid && lock != std::thread::id()) { + std::this_thread::sleep_for(std::chrono::milliseconds(0)); + } + lock = this_pid; + } + } + + void release_lock(){ + lock = std::thread::id(); + } + void gc() { - - } - void reg(void* v, uint32_t ref, uint32_t sz, - void(*f)(void*) = [](void* v) {free (v); }) { - current_size += sz; - if (current_size > max_size) - gc(); - q.push_back({ v, f }); + if (q->size() == 0) + return; + auto t = q; + acquire_lock(); + q = q_back; + release_lock(); + for(const auto& t : *t) { + std::get<1>(t)(std::get<0>(t)); + } + t->clear(); + q_back = t; + running = false; + current_size = 0; } + + void daemon() { using namespace std::chrono; + while (alive) { if (running) { - gc(); + if (current_size > max_size || + forceclean_timer > forced_clean) + { + gc(); + forceclean_timer = 0; + } std::this_thread::sleep_for(microseconds(interval)); + forceclean_timer += interval; } else { std::this_thread::sleep_for(10ms); + forceclean_timer += 10000; } } } void start_deamon() { - handle = std::thread(&daemon); + q = new vector>(); + q_back = new vector>(); + lock = thread::id(); alive = true; + handle = std::thread(&daemon); } + void terminate_daemon() { running = false; alive = false; + delete q; + delete q_back; using namespace std::chrono; if (handle.joinable()) { @@ -50,4 +94,36 @@ class GC { handle.join(); } } -}; \ No newline at end of file +public: + void reg(void* v, uint32_t sz = 1, + void(*f)(void*) = [](void* v) {free (v); } + ) { + acquire_lock(); + current_size += sz; + q.push_back({ v, f }); + running = true; + release_lock() + } + + GC( + uint32_t max_size = 0xfffffff, uint32_t interval = 10000, + uint32_t forced_clean = 1000000 //one seconds + ) : max_size(max_size), interval(interval), forced_clean(forced_clean){ + start_deamon(); + } // 256 MB + + ~GC(){ + terminate_daemon(); + } +}; + +#else +class GC { +public: + GC(uint32_t) = default; + void reg( + void* v, uint32_t = 0, + void(*f)(void*) = [](void* v) {free (v); } + ) const { f(v); } +} +#endif diff --git a/server/monetdb_conn.cpp b/server/monetdb_conn.cpp index b29f1a8..c0e9d5b 100644 --- a/server/monetdb_conn.cpp +++ b/server/monetdb_conn.cpp @@ -2,6 +2,7 @@ #include "libaquery.h" #include +#include #include "monetdb_conn.h" #include "monetdbe.h" #include "table.h" @@ -35,9 +36,19 @@ const unsigned char monetdbe_type_szs[] = { // should be last: 1 }; +namespace types{ + const Type_t monetdbe_type_aqtypes[] = { + ABOOL, AINT8, AINT16, AINT32, AINT64, +#ifdef HAVE_HGE + AINT128, +#endif + AUINT64, AFLOAT, ADOUBLE, ASTR, + // blob? + AINT64, + ADATE, ATIME, ATIMESTAMP, ERROR - - + }; +} Server::Server(Context* cxt){ if (cxt){ connect(cxt); @@ -80,7 +91,7 @@ void Server::connect(Context *cxt){ else{ if(server) free(server); - this->server = 0; + this->server = nullptr; status = false; puts(ret == -1 ? "Allocation Error." : "Internal Database Error."); } @@ -103,20 +114,58 @@ void Server::exec(const char* q){ bool Server::haserror(){ if (last_error){ - last_error = 0; + last_error = nullptr; return true; } else{ return false; } } +void Server::print_results(const char* sep, const char* end){ + + if (!haserror()){ + auto _res = static_cast (res); + const auto& ncols = _res->ncols; + monetdbe_column** cols = static_cast(malloc(sizeof(monetdbe_column*) * ncols)); + std::string* printf_string = new std::string[ncols]; + const char** col_data = static_cast (malloc(sizeof(char*) * ncols)); + uint8_t* szs = static_cast(alloca(ncols)); + std::string header_string = ""; + const char* err_msg = nullptr; + for(uint32_t i = 0; i < ncols; ++i){ + err_msg = monetdbe_result_fetch(_res, &cols[i], i); + printf_string[i] = + std::string(types::printf_str[types::monetdbe_type_aqtypes[cols[i]->type]]) + + (i < ncols - 1 ? sep : ""); + puts(printf_string[i].c_str()); + col_data[i] = static_cast(cols[i]->data); + szs [i] = monetdbe_type_szs[cols[i]->type]; + header_string = header_string + cols[i]->name + sep + '|' + sep; + } + const size_t l_sep = strlen(sep) + 1; + if (header_string.size() - l_sep >= 0) + header_string.resize(header_string.size() - l_sep); + header_string += end + std::string(header_string.size(), '=') + end; + fputs(header_string.c_str(), stdout); + for(uint64_t i = 0; i < cnt; ++i){ + for(uint32_t j = 0; j < ncols; ++j){ + printf(printf_string[j].c_str(), *((void**)col_data[j])); + col_data[j] += szs[j]; + } + fputs(end, stdout); + } + free(cols); + delete[] printf_string; + free(col_data); + } +} void Server::close(){ if(this->server){ auto server = static_cast(this->server); monetdbe_close(*(server)); free(server); - this->server = 0; + this->server = nullptr; } } @@ -140,7 +189,7 @@ void* Server::getCol(int col_idx){ else{ puts("Error: No result."); } - return 0; + return nullptr; } Server::~Server(){ diff --git a/server/monetdb_conn.h b/server/monetdb_conn.h index 467cb2c..3255342 100644 --- a/server/monetdb_conn.h +++ b/server/monetdb_conn.h @@ -22,6 +22,8 @@ struct Server{ void close(); bool haserror(); static bool havehge(); + void test(const char*); + void print_results(const char* sep = " ", const char* end = "\n"); ~Server(); }; diff --git a/server/server.cpp b/server/server.cpp index d618bf7..5d948b2 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -43,11 +43,11 @@ public: native_handle = dispatch_semaphore_create(v); } void acquire() { - puts("acquire"); + // puts("acquire"); dispatch_semaphore_wait(native_handle, DISPATCH_TIME_FOREVER); } void release() { - puts("release"); + // puts("release"); dispatch_semaphore_signal(native_handle); } ~A_Semaphore() { @@ -94,7 +94,7 @@ public: ~A_Semaphore() { } }; #endif -#ifdef __AQUERY_ITC_USE_SHMEM__ +#ifdef __AQUERY_ITC_USE_SEMPH__ A_Semaphore prompt{ true }, engine{ false }; #define PROMPT_ACQUIRE() prompt.acquire() #define PROMPT_RELEASE() prompt.release() @@ -283,6 +283,15 @@ int dll_main(int argc, char** argv, Context* cxt){ //printf("F::: %p\n", module_fn_map->find("mydiv") != module_fn_map->end() ? module_fn_map->find("mydiv")->second : nullptr); } break; + case 'O': + { + if(!server->haserror()){ + timer.reset(); + server->print_results(); + cfg->stats.postproc_time += timer.elapsed(); + } + } + break; case 'U': // Unload Module { auto mname = n_recvd[i] + 1;