From 751a4425545330d59688b4cf5b7dbae2dbafb9d1 Mon Sep 17 00:00:00 2001 From: Bill Date: Fri, 12 Aug 2022 06:34:55 +0800 Subject: [PATCH] update threadpool --- Makefile | 12 ++-- aquery_config.py | 1 + prompt.py | 1 + sdk/aquery.h | 5 +- sdk/aquery_mem.cpp | 22 ++++-- server/io.h | 13 +++- server/libaquery.h | 6 +- server/server.cpp | 16 +++-- server/table.h | 7 +- server/tests/thread_pool.hpp | 47 ++++++++++++ server/threading.cpp | 134 +++++++++++++++++++++++++++++++++++ server/threading.h | 42 +++++++++++ server/vector_type.hpp | 4 +- 13 files changed, 289 insertions(+), 21 deletions(-) create mode 100644 server/tests/thread_pool.hpp create mode 100644 server/threading.cpp create mode 100644 server/threading.h diff --git a/Makefile b/Makefile index 476ec61..4eda0dc 100644 --- a/Makefile +++ b/Makefile @@ -1,22 +1,26 @@ OS_SUPPORT = MonetDB_LIB = - +Threading = ifeq ($(OS),Windows_NT) OS_SUPPORT += server/winhelper.cpp MonetDB_LIB += -Imonetdb/msvc msc-plugin/monetdbe.dll else - MonetDB_LIB += -I/usr/include/monetdb -lmonetdbe + MonetDB_LIB += -I/usr/local/include/monetdb -I/usr/include/monetdb -lmonetdbe endif +ifeq ($(THREADING),1) + Threading += server/threading.cpp -DTHREADING +endif info: $(info $(OS_SUPPORT)) $(info $(OS)) + $(info $(Threading)) $(info "test") server.bin: - $(CXX) server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) -flto --std=c++1z -O3 -march=native -o server.bin + $(CXX) server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) $(Threading) -flto --std=c++1z -O3 -march=native -o server.bin server.so: # $(CXX) server/server.cpp server/monetdb_conn.cpp -fPIC -shared $(OS_SUPPORT) monetdb/msvc/monetdbe.dll --std=c++1z -O3 -march=native -o server.so -I./monetdb/msvc - $(CXX) -shared -fPIC -flto server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) server/monetdb_conn.cpp $(MonetDB_LIB) --std=c++1z -o server.so -O3 + $(CXX) -shared -fPIC -flto server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) server/monetdb_conn.cpp $(Threading) $(MonetDB_LIB) --std=c++1z -o server.so -O3 snippet: $(CXX) -shared -fPIC -flto --std=c++1z out.cpp server/monetdb_conn.cpp server/table.cpp server/io.cpp $(MonetDB_LIB) -O3 -march=native -o dll.so clean: diff --git a/aquery_config.py b/aquery_config.py index a44366f..26a9f51 100644 --- a/aquery_config.py +++ b/aquery_config.py @@ -2,6 +2,7 @@ import os # os.environ['CXX'] = 'C:/Program Files/LLVM/bin/clang.exe' +os.environ['THREADING'] = '0' add_path_to_ldpath = True rebuild_backend = True diff --git a/prompt.py b/prompt.py index 8d7a03b..83436cd 100644 --- a/prompt.py +++ b/prompt.py @@ -43,6 +43,7 @@ if aquery_config.rebuild_backend: os.remove(server_bin) except Exception as e: print(type(e), e) + subprocess.call(['make', "info"]) subprocess.call(['make', server_bin], stdout=nullstream) cleanup = True diff --git a/sdk/aquery.h b/sdk/aquery.h index 5a45a24..e541c82 100644 --- a/sdk/aquery.h +++ b/sdk/aquery.h @@ -3,11 +3,8 @@ extern void* Aalloc(size_t sz); extern int Afree(void * mem); +extern size_t register_memory(void* ptr, void(dealloc)(void*)); -template -size_t register_memory(T* ptr, void(dealloc)(void*)){ - [](void* m){ auto _m = static_cast(m); delete _m; }; -} struct Session{ struct Statistic{ size_t total_active; diff --git a/sdk/aquery_mem.cpp b/sdk/aquery_mem.cpp index 1788b5b..ac66277 100644 --- a/sdk/aquery_mem.cpp +++ b/sdk/aquery_mem.cpp @@ -2,18 +2,32 @@ #include #include -#include +#include Session* session; +void init_session(){ + +} + +void end_session(){ + +} + void* Aalloc(size_t sz){ - void mem = malloc(sz); - auto memmap = (std::unordered_set*) session->memory_map; + void* mem = malloc(sz); + auto memmap = (std::unordered_map*) session->memory_map; memmap->insert(mem); return mem; } int Afree(void* mem){ - auto memmap = (std::unordered_set*) session->memory_map; + auto memmap = (std::unordered_map*) session->memory_map; memmap->erase(mem); return free(mem); } + +void register_memory(void* ptr, void(dealloc)(void*)){ + auto memmap = (std::unordered_map*) session->memory_map; + memmap->insert(ptr); +} + diff --git a/server/io.h b/server/io.h index 2c5c413..fc87a06 100644 --- a/server/io.h +++ b/server/io.h @@ -16,10 +16,20 @@ std::string generate_printf_string(const char* sep = " ", const char* end = "\n" } #ifdef __SIZEOF_INT128__ +constexpr struct __int128__struct{ + uint64_t low, high; + // constexpr bool operator==(__int128_t x) const{ + // return (x>>64) == high and (x&0xffffffffffffffffull) == low; + // } + bool operator==(__int128_t x) const{ + return *((const __int128_t*) this) == x; + } +}__int128_max_v = {0x0000000000000000ull, 0x8000000000000000ull}; + inline const char* get_int128str(__int128_t v, char* buf){ bool neg = false; if (v < 0) { - if(v == std::numeric_limits<__int128_t>::min()) + if(__int128_max_v == v) return "-170141183460469231731687303715884105728"; v = -v; neg = true; @@ -62,4 +72,5 @@ inline decltype(auto) printi128<__uint128_t>(const __uint128_t& v) { #else #define printi128(x) x +#define setgbuf() #endif diff --git a/server/libaquery.h b/server/libaquery.h index e3f8af7..796f140 100644 --- a/server/libaquery.h +++ b/server/libaquery.h @@ -30,9 +30,13 @@ struct Context{ int n_buffers, *sz_bufs; void **buffers; - + void* alt_server; Log_level log_level = LOG_INFO; + +#ifdef THREADING + void* thread_pool; +#endif printf_type print = printf; template diff --git a/server/server.cpp b/server/server.cpp index 2f00a7d..50a383e 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -6,6 +6,9 @@ #include "libaquery.h" #include "monetdb_conn.h" +#ifdef THREADING +#include "threading.h" +#endif #ifdef _WIN32 #include "winhelper.h" #else @@ -54,10 +57,10 @@ extern "C" int __DLLEXPORT__ binary_info() { return MSVC; #elif defined(__CYGWIN__) || defined(__MINGW32__) || defined(__MINGW64__) return MSYS; +#elif defined(__clang__) + return CLANG; #elif defined(__GNUC__) return GCC; -#else - return AppleClang; #endif } @@ -132,11 +135,16 @@ int dll_main(int argc, char** argv, Context* cxt){ } extern "C" int __DLLEXPORT__ main(int argc, char** argv) { - puts("running"); Context* cxt = new Context(); cxt->log("%d %s\n", argc, argv[1]); - + +#ifdef THREADING + auto tp = new ThreadPool(); + cxt->thread_pool = tp; +#endif + + const char* shmname; if (argc < 0) return dll_main(argc, argv, cxt); diff --git a/server/table.h b/server/table.h index 8c3a929..e480945 100644 --- a/server/table.h +++ b/server/table.h @@ -160,8 +160,12 @@ std::ostream& operator<<(std::ostream& os, const VT& v) v.out(); return os; } + +#ifdef __SIZEOF_INT128__ std::ostream& operator<<(std::ostream& os, __int128 & v); std::ostream& operator<<(std::ostream& os, __uint128_t & v); +#endif + template struct decayed_impl { typedef ColRef type; }; @@ -329,10 +333,11 @@ struct TableInfo { header_string.resize(header_string.size() - l_sep); const auto& prt_loop = [&fp, &view, &printf_string, *this](const auto& f) { +#ifdef __SIZEOF_INT128__ constexpr auto num_hge = count_type<__int128_t, __uint128_t>((tuple_type*)(0)); char cbuf[num_hge * 41]; setgbuf(cbuf); - +#endif if(view) for (int i = 0; i < view->size; ++i){ print2_impl(f, (*view)[i], printf_string.c_str()); diff --git a/server/tests/thread_pool.hpp b/server/tests/thread_pool.hpp new file mode 100644 index 0000000..d7b8f62 --- /dev/null +++ b/server/tests/thread_pool.hpp @@ -0,0 +1,47 @@ +#include "../threading.h" +#include +#include +#include +using namespace std; + +FILE *fp; +int testing_throughput(uint32_t n_jobs){ + printf("Threadpool througput test with %u jobs.\n", n_jobs); + + auto tp = ThreadPool(thread::hardware_concurrency()); + getchar(); + auto i = 0u; + fp = fopen("tmp.tmp", "w"); + auto time = chrono::high_resolution_clock::now(); + while(i++ < n_jobs) tp.enqueue_task({ [](void* f) {fprintf(fp, "%d ", *(int*)f); free(f); }, new int(i) }); + puts("done dispatching."); + while (tp.busy()) this_thread::sleep_for(1s); + auto t = (chrono::high_resolution_clock::now() - time).count(); + printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", i, t, i/(double)(t)); + //this_thread::sleep_for(2s); + return 0; +} + +int testing_transaction(uint32_t n_burst, uint32_t n_batch, + uint32_t base_time, uint32_t var_time){ + printf("Threadpool transaction test: burst: %u, batch: %u, time: [%u, %u].\n" + , n_burst, n_batch, base_time, var_time + base_time); + + auto tp = ThreadPool(thread::hardware_concurrency()); + getchar(); + auto i = 0u, j = 0u; + auto time = chrono::high_resolution_clock::now(); + while(j++ < n_batch){ + i = 0u; + while(i++ < n_burst) + tp.enqueue_task({ [](void* f) { printf( "%d ", *(int*)f); free(f); }, new int(i) }); + fflush(stdout); + this_thread::sleep_for(chrono::microseconds(rand()%var_time + base_time)); + } + puts("done dispatching."); + while (tp.busy()) this_thread::sleep_for(1s); + auto t = (chrono::high_resolution_clock::now() - time).count(); + printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", j*i, t, j*i/(double)(t)); + return 0; + +} diff --git a/server/threading.cpp b/server/threading.cpp new file mode 100644 index 0000000..734ae36 --- /dev/null +++ b/server/threading.cpp @@ -0,0 +1,134 @@ +#include "threading.h" +#include +#include +#include +#include + +using namespace std; +using namespace chrono_literals; + +#define A_TP_HAVE_PAYLOAD(x) ((x) & 0b1) +#define A_TP_SET_PAYLOAD(x) ((x) |= 0b1) +#define A_TP_UNSET_PAYLOAD(x) ((x) &= 0xfe) +#define A_TP_IS_RUNNING(x) ((x) & 0b10) + +void ThreadPool::daemon_proc(uint32_t id){ + auto tf = static_cast*>(this->thread_flags); + auto ticking = static_cast*>(this->ticking); + + for(; tf[id]; this_thread::sleep_for(*ticking? 0ns:100ms)) { + if (A_TP_HAVE_PAYLOAD(tf[id])) { + A_TP_SET_PAYLOAD(tf[id]); + current_payload[id](); + current_payload[id].empty(); + A_TP_UNSET_PAYLOAD(tf[id]); + } + } +} + +void ThreadPool::tick(){ + auto pq_lock = static_cast(payload_queue_lock); + auto pq = static_cast *>(payload_queue); + auto tf = static_cast*>(this->thread_flags); + auto ticking = static_cast*>(this->ticking); + auto th = static_cast(this->thread_handles); + for(; !this->terminate; this_thread::sleep_for(50ms)){ + if(*ticking) { + bool quit = false; + for(; !quit; ){ + for(uint32_t i = 0; i < n_threads; ++i){ + if(!A_TP_HAVE_PAYLOAD(tf[i])){ + pq_lock->lock(); + payload_t& p = pq->front(); + current_payload[i] = p; + A_TP_SET_PAYLOAD(tf[i]); + pq->pop_front(); + quit = !pq->size(); + pq_lock->unlock(); + if (quit) break; + } + } + } + puts("done"); + *ticking = false; + } + } + + for (uint32_t i = 0; i < n_threads; ++i) + tf[i] &= 0xfd; + for (uint32_t i = 0; i < n_threads; ++i) + th[i].join(); + + delete[] th; + delete[] tf; + delete pq; + delete pq_lock; + delete ticking; + auto cp = static_cast(current_payload); + delete[] cp; +} + +ThreadPool::ThreadPool(uint32_t n_threads) + : n_threads(n_threads) { + printf("Thread pool started with %u threads;", n_threads); + fflush(stdout); + this->terminate = false; + payload_queue = new deque; + auto th = new thread[n_threads]; + auto tf = new atomic[n_threads]; + + thread_handles = th; + thread_flags = tf; + ticking = static_cast(new atomic(false)); + + for (uint32_t i = 0; i < n_threads; ++i){ + atomic_init(tf + i, 0b10); + th[i] = thread(&ThreadPool::daemon_proc, this, i); + } + + payload_queue_lock = new mutex(); + tick_handle = new thread(&ThreadPool::tick, this); + current_payload = new payload_t[n_threads]; +} + + +void ThreadPool::enqueue_task(const payload_t& payload){ + auto pq_lock = static_cast(payload_queue_lock); + auto pq = static_cast *>(payload_queue); + auto tf = static_cast*>(this->thread_flags); + auto& ticking = *static_cast*>(this->ticking); + + if (!ticking){ + for (uint32_t i = 0; i < n_threads; ++i){ + if(!A_TP_HAVE_PAYLOAD(tf[i])){ + current_payload[i] = payload; + A_TP_SET_PAYLOAD(tf[i]); + return; + } + } + } + + pq_lock->lock(); + pq->push_back(payload); + ticking = true; + pq_lock->unlock(); +} + +ThreadPool::~ThreadPool() { + this->terminate = true; + auto tick = static_cast (tick_handle); + tick->join(); + delete tick; + puts("Thread pool terminated."); +} + +bool ThreadPool::busy(){ + if (!*(atomic*)ticking) { + for (int i = 0; i < n_threads; ++i) + if (A_TP_HAVE_PAYLOAD(((atomic*)thread_flags)[i])) + return true; + return false; + } + return true; +} + diff --git a/server/threading.h b/server/threading.h new file mode 100644 index 0000000..861e941 --- /dev/null +++ b/server/threading.h @@ -0,0 +1,42 @@ +#ifndef _AQ_THREADING_H +#define _AQ_THREADING_H + +#include + +class ThreadPool{ + +public: + typedef void(*payload_fn_t)(void*); + + struct payload_t{ + payload_fn_t f; + void* args; + constexpr payload_t(payload_fn_t f, void* args) noexcept + : f(f), args(args) {} + constexpr payload_t() noexcept + : f(nullptr), args(nullptr) {}; + bool is_empty() const { return f && args; } + void empty() { f = nullptr; args = nullptr; } + void operator()() { f(args); } + }; + ThreadPool(uint32_t n_threads = 0); + void enqueue_task(const payload_t& payload); + bool busy(); + virtual ~ThreadPool(); + +private: + uint32_t n_threads; + void* thread_handles; + void* thread_flags; + payload_t* current_payload; + void* payload_queue; + void* tick_handle; + void* ticking; + void* payload_queue_lock; + bool terminate; + void tick(); + void daemon_proc(uint32_t); + +}; + +#endif diff --git a/server/vector_type.hpp b/server/vector_type.hpp index fb2cded..d88b1ed 100644 --- a/server/vector_type.hpp +++ b/server/vector_type.hpp @@ -165,7 +165,7 @@ public: void qpop() { size = size ? size - 1 : size; } - void pop() { + void pop_resize() { if (size) { --size; if (capacity > (size << 1)) @@ -178,7 +178,7 @@ public: } } } - _Ty pop_back() { + _Ty pop() { return container[--size]; } void merge(vector_type<_Ty>& _other) {