update threadpool

dev
Bill 2 years ago
parent 4dd6de1b0a
commit 751a442554

@ -1,22 +1,26 @@
OS_SUPPORT =
MonetDB_LIB =
Threading =
ifeq ($(OS),Windows_NT)
OS_SUPPORT += server/winhelper.cpp
MonetDB_LIB += -Imonetdb/msvc msc-plugin/monetdbe.dll
else
MonetDB_LIB += -I/usr/include/monetdb -lmonetdbe
MonetDB_LIB += -I/usr/local/include/monetdb -I/usr/include/monetdb -lmonetdbe
endif
ifeq ($(THREADING),1)
Threading += server/threading.cpp -DTHREADING
endif
info:
$(info $(OS_SUPPORT))
$(info $(OS))
$(info $(Threading))
$(info "test")
server.bin:
$(CXX) server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) -flto --std=c++1z -O3 -march=native -o server.bin
$(CXX) server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) $(Threading) -flto --std=c++1z -O3 -march=native -o server.bin
server.so:
# $(CXX) server/server.cpp server/monetdb_conn.cpp -fPIC -shared $(OS_SUPPORT) monetdb/msvc/monetdbe.dll --std=c++1z -O3 -march=native -o server.so -I./monetdb/msvc
$(CXX) -shared -fPIC -flto server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) server/monetdb_conn.cpp $(MonetDB_LIB) --std=c++1z -o server.so -O3
$(CXX) -shared -fPIC -flto server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) server/monetdb_conn.cpp $(Threading) $(MonetDB_LIB) --std=c++1z -o server.so -O3
snippet:
$(CXX) -shared -fPIC -flto --std=c++1z out.cpp server/monetdb_conn.cpp server/table.cpp server/io.cpp $(MonetDB_LIB) -O3 -march=native -o dll.so
clean:

@ -2,6 +2,7 @@
import os
# os.environ['CXX'] = 'C:/Program Files/LLVM/bin/clang.exe'
os.environ['THREADING'] = '0'
add_path_to_ldpath = True
rebuild_backend = True

@ -43,6 +43,7 @@ if aquery_config.rebuild_backend:
os.remove(server_bin)
except Exception as e:
print(type(e), e)
subprocess.call(['make', "info"])
subprocess.call(['make', server_bin], stdout=nullstream)
cleanup = True

@ -3,11 +3,8 @@
extern void* Aalloc(size_t sz);
extern int Afree(void * mem);
extern size_t register_memory(void* ptr, void(dealloc)(void*));
template <typename T>
size_t register_memory(T* ptr, void(dealloc)(void*)){
[](void* m){ auto _m = static_cast<T*>(m); delete _m; };
}
struct Session{
struct Statistic{
size_t total_active;

@ -2,18 +2,32 @@
#include <memory>
#include <stdlib>
#include <unordered_set>
#include <unordered_map>
Session* session;
void init_session(){
}
void end_session(){
}
void* Aalloc(size_t sz){
void mem = malloc(sz);
auto memmap = (std::unordered_set<void*>*) session->memory_map;
void* mem = malloc(sz);
auto memmap = (std::unordered_map<void*>*) session->memory_map;
memmap->insert(mem);
return mem;
}
int Afree(void* mem){
auto memmap = (std::unordered_set<void*>*) session->memory_map;
auto memmap = (std::unordered_map<void*>*) session->memory_map;
memmap->erase(mem);
return free(mem);
}
void register_memory(void* ptr, void(dealloc)(void*)){
auto memmap = (std::unordered_map<void*>*) session->memory_map;
memmap->insert(ptr);
}

@ -16,10 +16,20 @@ std::string generate_printf_string(const char* sep = " ", const char* end = "\n"
}
#ifdef __SIZEOF_INT128__
constexpr struct __int128__struct{
uint64_t low, high;
// constexpr bool operator==(__int128_t x) const{
// return (x>>64) == high and (x&0xffffffffffffffffull) == low;
// }
bool operator==(__int128_t x) const{
return *((const __int128_t*) this) == x;
}
}__int128_max_v = {0x0000000000000000ull, 0x8000000000000000ull};
inline const char* get_int128str(__int128_t v, char* buf){
bool neg = false;
if (v < 0) {
if(v == std::numeric_limits<__int128_t>::min())
if(__int128_max_v == v)
return "-170141183460469231731687303715884105728";
v = -v;
neg = true;
@ -62,4 +72,5 @@ inline decltype(auto) printi128<__uint128_t>(const __uint128_t& v) {
#else
#define printi128(x) x
#define setgbuf()
#endif

@ -30,9 +30,13 @@ struct Context{
int n_buffers, *sz_bufs;
void **buffers;
void* alt_server;
Log_level log_level = LOG_INFO;
#ifdef THREADING
void* thread_pool;
#endif
printf_type print = printf;
template <class ...Types>

@ -6,6 +6,9 @@
#include "libaquery.h"
#include "monetdb_conn.h"
#ifdef THREADING
#include "threading.h"
#endif
#ifdef _WIN32
#include "winhelper.h"
#else
@ -54,10 +57,10 @@ extern "C" int __DLLEXPORT__ binary_info() {
return MSVC;
#elif defined(__CYGWIN__) || defined(__MINGW32__) || defined(__MINGW64__)
return MSYS;
#elif defined(__clang__)
return CLANG;
#elif defined(__GNUC__)
return GCC;
#else
return AppleClang;
#endif
}
@ -132,11 +135,16 @@ int dll_main(int argc, char** argv, Context* cxt){
}
extern "C" int __DLLEXPORT__ main(int argc, char** argv) {
puts("running");
Context* cxt = new Context();
cxt->log("%d %s\n", argc, argv[1]);
#ifdef THREADING
auto tp = new ThreadPool();
cxt->thread_pool = tp;
#endif
const char* shmname;
if (argc < 0)
return dll_main(argc, argv, cxt);

@ -160,8 +160,12 @@ std::ostream& operator<<(std::ostream& os, const VT<T>& v)
v.out();
return os;
}
#ifdef __SIZEOF_INT128__
std::ostream& operator<<(std::ostream& os, __int128 & v);
std::ostream& operator<<(std::ostream& os, __uint128_t & v);
#endif
template <class Type>
struct decayed_impl<ColView, Type> { typedef ColRef<Type> type; };
@ -329,10 +333,11 @@ struct TableInfo {
header_string.resize(header_string.size() - l_sep);
const auto& prt_loop = [&fp, &view, &printf_string, *this](const auto& f) {
#ifdef __SIZEOF_INT128__
constexpr auto num_hge = count_type<__int128_t, __uint128_t>((tuple_type*)(0));
char cbuf[num_hge * 41];
setgbuf(cbuf);
#endif
if(view)
for (int i = 0; i < view->size; ++i){
print2_impl<cols...>(f, (*view)[i], printf_string.c_str());

@ -0,0 +1,47 @@
#include "../threading.h"
#include <thread>
#include <cstdio>
#include <cstdlib>
using namespace std;
FILE *fp;
int testing_throughput(uint32_t n_jobs){
printf("Threadpool througput test with %u jobs.\n", n_jobs);
auto tp = ThreadPool(thread::hardware_concurrency());
getchar();
auto i = 0u;
fp = fopen("tmp.tmp", "w");
auto time = chrono::high_resolution_clock::now();
while(i++ < n_jobs) tp.enqueue_task({ [](void* f) {fprintf(fp, "%d ", *(int*)f); free(f); }, new int(i) });
puts("done dispatching.");
while (tp.busy()) this_thread::sleep_for(1s);
auto t = (chrono::high_resolution_clock::now() - time).count();
printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", i, t, i/(double)(t));
//this_thread::sleep_for(2s);
return 0;
}
int testing_transaction(uint32_t n_burst, uint32_t n_batch,
uint32_t base_time, uint32_t var_time){
printf("Threadpool transaction test: burst: %u, batch: %u, time: [%u, %u].\n"
, n_burst, n_batch, base_time, var_time + base_time);
auto tp = ThreadPool(thread::hardware_concurrency());
getchar();
auto i = 0u, j = 0u;
auto time = chrono::high_resolution_clock::now();
while(j++ < n_batch){
i = 0u;
while(i++ < n_burst)
tp.enqueue_task({ [](void* f) { printf( "%d ", *(int*)f); free(f); }, new int(i) });
fflush(stdout);
this_thread::sleep_for(chrono::microseconds(rand()%var_time + base_time));
}
puts("done dispatching.");
while (tp.busy()) this_thread::sleep_for(1s);
auto t = (chrono::high_resolution_clock::now() - time).count();
printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", j*i, t, j*i/(double)(t));
return 0;
}

@ -0,0 +1,134 @@
#include "threading.h"
#include <thread>
#include <atomic>
#include <mutex>
#include <deque>
using namespace std;
using namespace chrono_literals;
#define A_TP_HAVE_PAYLOAD(x) ((x) & 0b1)
#define A_TP_SET_PAYLOAD(x) ((x) |= 0b1)
#define A_TP_UNSET_PAYLOAD(x) ((x) &= 0xfe)
#define A_TP_IS_RUNNING(x) ((x) & 0b10)
void ThreadPool::daemon_proc(uint32_t id){
auto tf = static_cast<atomic<uint8_t>*>(this->thread_flags);
auto ticking = static_cast<atomic<bool>*>(this->ticking);
for(; tf[id]; this_thread::sleep_for(*ticking? 0ns:100ms)) {
if (A_TP_HAVE_PAYLOAD(tf[id])) {
A_TP_SET_PAYLOAD(tf[id]);
current_payload[id]();
current_payload[id].empty();
A_TP_UNSET_PAYLOAD(tf[id]);
}
}
}
void ThreadPool::tick(){
auto pq_lock = static_cast<mutex*>(payload_queue_lock);
auto pq = static_cast<deque<payload_t> *>(payload_queue);
auto tf = static_cast<atomic<uint8_t>*>(this->thread_flags);
auto ticking = static_cast<atomic<bool>*>(this->ticking);
auto th = static_cast<thread*>(this->thread_handles);
for(; !this->terminate; this_thread::sleep_for(50ms)){
if(*ticking) {
bool quit = false;
for(; !quit; ){
for(uint32_t i = 0; i < n_threads; ++i){
if(!A_TP_HAVE_PAYLOAD(tf[i])){
pq_lock->lock();
payload_t& p = pq->front();
current_payload[i] = p;
A_TP_SET_PAYLOAD(tf[i]);
pq->pop_front();
quit = !pq->size();
pq_lock->unlock();
if (quit) break;
}
}
}
puts("done");
*ticking = false;
}
}
for (uint32_t i = 0; i < n_threads; ++i)
tf[i] &= 0xfd;
for (uint32_t i = 0; i < n_threads; ++i)
th[i].join();
delete[] th;
delete[] tf;
delete pq;
delete pq_lock;
delete ticking;
auto cp = static_cast<payload_t*>(current_payload);
delete[] cp;
}
ThreadPool::ThreadPool(uint32_t n_threads)
: n_threads(n_threads) {
printf("Thread pool started with %u threads;", n_threads);
fflush(stdout);
this->terminate = false;
payload_queue = new deque<payload_t>;
auto th = new thread[n_threads];
auto tf = new atomic<uint8_t>[n_threads];
thread_handles = th;
thread_flags = tf;
ticking = static_cast<void*>(new atomic<bool>(false));
for (uint32_t i = 0; i < n_threads; ++i){
atomic_init(tf + i, 0b10);
th[i] = thread(&ThreadPool::daemon_proc, this, i);
}
payload_queue_lock = new mutex();
tick_handle = new thread(&ThreadPool::tick, this);
current_payload = new payload_t[n_threads];
}
void ThreadPool::enqueue_task(const payload_t& payload){
auto pq_lock = static_cast<mutex*>(payload_queue_lock);
auto pq = static_cast<deque<payload_t> *>(payload_queue);
auto tf = static_cast<atomic<uint8_t>*>(this->thread_flags);
auto& ticking = *static_cast<atomic<bool>*>(this->ticking);
if (!ticking){
for (uint32_t i = 0; i < n_threads; ++i){
if(!A_TP_HAVE_PAYLOAD(tf[i])){
current_payload[i] = payload;
A_TP_SET_PAYLOAD(tf[i]);
return;
}
}
}
pq_lock->lock();
pq->push_back(payload);
ticking = true;
pq_lock->unlock();
}
ThreadPool::~ThreadPool() {
this->terminate = true;
auto tick = static_cast<thread*> (tick_handle);
tick->join();
delete tick;
puts("Thread pool terminated.");
}
bool ThreadPool::busy(){
if (!*(atomic<bool>*)ticking) {
for (int i = 0; i < n_threads; ++i)
if (A_TP_HAVE_PAYLOAD(((atomic<uint8_t>*)thread_flags)[i]))
return true;
return false;
}
return true;
}

@ -0,0 +1,42 @@
#ifndef _AQ_THREADING_H
#define _AQ_THREADING_H
#include <stdint.h>
class ThreadPool{
public:
typedef void(*payload_fn_t)(void*);
struct payload_t{
payload_fn_t f;
void* args;
constexpr payload_t(payload_fn_t f, void* args) noexcept
: f(f), args(args) {}
constexpr payload_t() noexcept
: f(nullptr), args(nullptr) {};
bool is_empty() const { return f && args; }
void empty() { f = nullptr; args = nullptr; }
void operator()() { f(args); }
};
ThreadPool(uint32_t n_threads = 0);
void enqueue_task(const payload_t& payload);
bool busy();
virtual ~ThreadPool();
private:
uint32_t n_threads;
void* thread_handles;
void* thread_flags;
payload_t* current_payload;
void* payload_queue;
void* tick_handle;
void* ticking;
void* payload_queue_lock;
bool terminate;
void tick();
void daemon_proc(uint32_t);
};
#endif

@ -165,7 +165,7 @@ public:
void qpop() {
size = size ? size - 1 : size;
}
void pop() {
void pop_resize() {
if (size) {
--size;
if (capacity > (size << 1))
@ -178,7 +178,7 @@ public:
}
}
}
_Ty pop_back() {
_Ty pop() {
return container[--size];
}
void merge(vector_type<_Ty>& _other) {

Loading…
Cancel
Save