wip: fast print

dev
Bill 2 years ago
parent 1832de1241
commit c1b1b26d1a

@ -15,8 +15,8 @@ FPIC = -fPIC
COMPILER = $(shell $(CXX) --version | grep -q clang && echo clang|| echo gcc)
LIBTOOL = ar rcs
USELIB_FLAG = -Wl,--whole-archive,libaquery.a -Wl,-no-whole-archive
LIBAQ_SRC = server/monetdb_conn.cpp server/libaquery.cpp
LIBAQ_OBJ = monetdb_conn.o libaquery.o
LIBAQ_SRC = server/monetdb_conn.cpp server/libaquery.cpp server/dragonbox/dragonbox_to_chars.cpp
LIBAQ_OBJ = monetdb_conn.o libaquery.o dragonbox_to_chars.o
SEMANTIC_INTERPOSITION = -fno-semantic-interposition
RANLIB = ranlib
@ -117,7 +117,7 @@ info:
$(info $" FPIC: $(FPIC))
pch:
$(CXX) -x c++-header server/pch.hpp $(FPIC) $(CXXFLAGS)
libaquery.a:
libaquery:
$(CXX) -c $(FPIC) $(PCHFLAGS) $(LIBAQ_SRC) $(OS_SUPPORT) $(CXXFLAGS) &&\
$(LIBTOOL) libaquery.a $(LIBAQ_OBJ) &&\
$(RANLIB) libaquery.a
@ -130,9 +130,9 @@ launcher:
$(CXX) -D__AQ_BUILD_LAUNCHER__ $(LIBAQ_SRC) $(OS_SUPPORT) $(BINARYFLAGS) -o aq
server.so:
# $(CXX) -z muldefs server/server.cpp server/monetdb_conn.cpp -fPIC -shared $(OS_SUPPORT) monetdb/msvc/monetdbe.dll --std=c++1z -O3 -march=native -o server.so -I./monetdb/msvc
$(CXX) $(SHAREDFLAGS) $(PCHFLAGS) $(LIBAQ_SRC) server/server.cpp server/dragonbox/dragonbox_to_chars.cpp $(OS_SUPPORT) -o server.so
$(CXX) $(SHAREDFLAGS) $(PCHFLAGS) $(LIBAQ_SRC) server/server.cpp $(OS_SUPPORT) -o server.so
server_uselib:
$(CXX) $(SHAREDFLAGS) server/server.cpp libaquery.a server/dragonbox/dragonbox_to_chars.cpp -o server.so
$(CXX) $(SHAREDFLAGS) server/server.cpp libaquery.a -o server.so
snippet:
$(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp $(LIBAQ_SRC) -o dll.so

@ -80,7 +80,7 @@ class build_manager:
headerfiles = ['server/aggregations.h', 'server/hasher.h', 'server/io.h',
'server/libaquery.h', 'server/monetdb_conn.h', 'server/pch.hpp',
'server/table.h', 'server/threading.h', 'server/types.h', 'server/utils.h',
'server/winhelper.h', 'server/gc.hpp', 'server/vector_type.hpp',
'server/winhelper.h', 'server/gc.h', 'server/vector_type.hpp',
'server/table_ext_monetdb.hpp'
]
@ -120,7 +120,7 @@ class build_manager:
os.environ['AQ_DEBUG'] = '0' if mgr.OptimizationLv else '1'
def libaquery_a(self):
self.build_cmd = [['rm', 'libaquery.a'],['make', 'libaquery.a']]
self.build_cmd = [['rm', 'libaquery.a'],['make', 'libaquery']]
return self.build()
def pch(self):
self.build_cmd = [['rm', 'server/pch.hpp.gch'], ['make', 'pch']]

@ -221,7 +221,7 @@
<ItemGroup>
<ClInclude Include="..\csv.h" />
<ClInclude Include="..\server\aggregations.h" />
<ClInclude Include="..\server\gc.hpp" />
<ClInclude Include="..\server\gc.h" />
<ClInclude Include="..\server\hasher.h" />
<ClInclude Include="..\server\io.h" />
<ClInclude Include="..\server\libaquery.h" />

@ -3,11 +3,13 @@
class GC {
private:;
size_t max_size, max_slots,
size_t max_slots,
interval, forced_clean,
forceclean_timer = 0;
uint64_t max_size;
bool running, alive;
// ptr, dealloc, ref, sz
uint32_t threshould;
void *q, *q_back;
void* handle;
std::atomic<uint32_t> slot_pos;
@ -29,19 +31,21 @@ public:
);
GC(
uint32_t max_size = 0xfffffff, uint32_t max_slots = 4096,
uint32_t interval = 10000, uint32_t forced_clean = 1000000 //one seconds
uint64_t max_size = 0xfffffff, uint32_t max_slots = 4096,
uint32_t interval = 10000, uint32_t forced_clean = 1000000,
uint32_t threshould = 64 //one seconds
) : max_size(max_size), max_slots(max_slots),
interval(interval), forced_clean(forced_clean){
interval(interval), forced_clean(forced_clean),
threshould(threshould) {
start_deamon();
GC::gc = this;
GC::gc_handle = this;
} // 256 MB
~GC(){
terminate_daemon();
}
static GC* gc;
static GC* gc_handle;
constexpr static void(*_free) (void*) = free;
};

@ -1,130 +0,0 @@
#pragma once
#include <vector_type>
#include <utility>
#include <thread>
#include <chrono>
#include <atomic>
#ifndef __AQ_USE_THREADEDGC__
class GC {
private:
template<class T>
using vector = vector_type<T>;
template<class ...T>
using tuple = std::tuple<T...>;
size_t current_size = 0, max_size,
interval, forced_clean,
forceclean_timer = 0;
bool running, alive;
// ptr, dealloc, ref, sz
vector<tuple<void*, void (*)(void*)>> *q, *q_back;
std::thread handle;
std::atomic<std::thread::id> lock;
// maybe use volatile std::thread::id instead
protected:
void acquire_lock() {
auto this_pid = std::this_thread::get_id();
while(lock != this_pid)
{
while(lock != this_pid && lock != std::thread::id()) {
std::this_thread::sleep_for(std::chrono::milliseconds(0));
}
lock = this_pid;
}
}
void release_lock(){
lock = std::thread::id();
}
void gc()
{
if (q->size == 0)
return;
auto t = q;
acquire_lock();
q = q_back;
release_lock();
for(const auto& t : *t) {
std::get<1>(t)(std::get<0>(t));
}
t->clear();
q_back = t;
running = false;
current_size = 0;
}
void daemon() {
using namespace std::chrono;
while (alive) {
if (running) {
if (current_size > max_size ||
forceclean_timer > forced_clean)
{
gc();
forceclean_timer = 0;
}
std::this_thread::sleep_for(microseconds(interval));
forceclean_timer += interval;
}
else {
std::this_thread::sleep_for(10ms);
forceclean_timer += 10000;
}
}
}
void start_deamon() {
q = new vector<tuple<void*, void (*)(void*)>>();
q_back = new vector<tuple<void*, void (*)(void*)>>();
lock = thread::id();
alive = true;
handle = std::thread(&daemon);
}
void terminate_daemon() {
running = false;
alive = false;
delete q;
delete q_back;
using namespace std::chrono;
if (handle.joinable()) {
std::this_thread::sleep_for(microseconds(1000 + std::max(static_cast<size_t>(10000), interval)));
handle.join();
}
}
public:
void reg(void* v, uint32_t sz = 1,
void(*f)(void*) = [](void* v) {free (v); }
) {
acquire_lock();
current_size += sz;
q.push_back({ v, f });
running = true;
release_lock()
}
GC(
uint32_t max_size = 0xfffffff, uint32_t interval = 10000,
uint32_t forced_clean = 1000000 //one seconds
) : max_size(max_size), interval(interval), forced_clean(forced_clean){
start_deamon();
} // 256 MB
~GC(){
terminate_daemon();
}
};
#else
class GC {
public:
GC(uint32_t) = default;
void reg(
void* v, uint32_t = 0,
void(*f)(void*) = [](void* v) {free (v); }
) const { f(v); }
}
#endif

@ -313,6 +313,7 @@ void* Context::get_module_function(const char* fname){
auto ret = fmap->find(fname);
return ret == fmap->end() ? nullptr : ret->second;
}
// template<typename _Ty>
// inline void vector_type<_Ty>::out(uint32_t n, const char* sep) const
// {
@ -328,10 +329,8 @@ void* Context::get_module_function(const char* fname){
// }
#include "gc.h"
#include <vector_type>
#include <utility>
#include <thread>
#include <chrono>
#ifndef __AQ_USE_THREADEDGC__
struct gcmemory_t{
@ -341,41 +340,41 @@ struct gcmemory_t{
using memoryqueue_t = gcmemory_t*;
void GC::acquire_lock() {
auto this_tid = std::this_thread::get_id();
while(lock != this_tid)
{
while(lock != this_tid && lock != std::thread::id()) {
std::this_thread::sleep_for(std::chrono::milliseconds(0));
}
lock = this_tid;
}
// auto this_tid = std::this_thread::get_id();
// while(lock != this_tid)
// {
// while(lock != this_tid && lock != std::thread::id()) {
// std::this_thread::sleep_for(std::chrono::milliseconds(0));
// }
// lock = this_tid;
// }
}
void GC::release_lock(){
lock = std::thread::id();
// lock = std::thread::id();
}
void GC::gc()
{
auto& _q = static_cast<memoryqueue_t*>(q);
auto& _q_back = static_cast<memoryqueue_t*>(q_back);
if (_q->size == 0)
auto _q = static_cast<memoryqueue_t>(q);
auto _q_back = static_cast<memoryqueue_t>(q_back);
if (slot_pos == 0)
return;
auto t = _q;
lock = true;
while(alive_cnt > 0);
_q = q_back;
while(alive_cnt != 0);
q = _q_back;
uint32_t _slot = slot_pos;
slot_pos = 0;
current_size = 0;
lock = false;
_q_back = t;
q_back = t;
for(uint32_t i = 0; i < _slot; ++i){
if (_q_back[i]->memory != nullptr && _q_back[i]->deallocator != nullptr)
_q_back[i]->deallocator(_q_back[i]->memory);
if (_q[i].memory != nullptr && _q[i].deallocator != nullptr)
_q[i].deallocator(_q[i].memory);
}
memset(_q_back, 0, sizeof(gcmemory_t) * _slot);
memset(_q, 0, sizeof(gcmemory_t) * _slot);
running = false;
}
@ -384,7 +383,7 @@ void GC::daemon() {
while (alive) {
if (running) {
if (current_size > max_size ||
if (current_size - max_size > 0 ||
forceclean_timer > forced_clean)
{
gc();
@ -433,16 +432,24 @@ void GC::reg(void* v, uint32_t sz, void(*f)(void*)) { //~ 40ns expected v. free
f(v);
return;
}
auto _q = static_cast<memoryqueue_t>q;
auto _q = static_cast<memoryqueue_t>(q);
while(lock);
++alive_cnt;
current_size += sz;
auto _slot = (slot_pos += 1);
q[_slot] = {v, f};
_q[_slot] = {v, f};
--alive_cnt;
running = true;
}
#endif
static GC* GC::gc = nullptr;
GC* GC::gc_handle = nullptr;
#include "dragonbox/dragonbox_to_chars.h"
void test(){
char buf[32];
double d = 123456789.123456789;
auto ret = jkj::dragonbox::to_chars(d, buf);
printf("%s\n", buf);
}

@ -7,9 +7,31 @@
#define __AQ_THREADED_GC__
#endif
#include "table.h"
#include <unordered_map>
#include <chrono>
class aq_timer {
private:
std::chrono::high_resolution_clock::time_point now;
public:
aq_timer(){
now = std::chrono::high_resolution_clock::now();
}
void reset(){
now = std::chrono::high_resolution_clock::now();
}
long long elapsed(){
long long ret = (std::chrono::high_resolution_clock::now() - now).count();
reset();
return ret;
}
long long lap() const{
long long ret = (std::chrono::high_resolution_clock::now() - now).count();
return ret;
}
};
#include "table.h"
enum Log_level {
LOG_INFO,
@ -84,26 +106,7 @@ struct Context{
std::unordered_map<const char*, uColRef *> cols;
};
class aq_timer {
private:
std::chrono::high_resolution_clock::time_point now;
public:
aq_timer(){
now = std::chrono::high_resolution_clock::now();
}
void reset(){
now = std::chrono::high_resolution_clock::now();
}
long long elapsed(){
long long ret = (std::chrono::high_resolution_clock::now() - now).count();
reset();
return ret;
}
long long lap() const{
long long ret = (std::chrono::high_resolution_clock::now() - now).count();
return ret;
}
};
#ifdef _WIN32
#define __DLLEXPORT__ __declspec(dllexport) __stdcall
@ -115,3 +118,5 @@ public:
typedef void (*deallocator_t) (void*);
#endif
void test();

@ -3,6 +3,7 @@
#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include "libaquery.h"
#include "monetdb_conn.h"
@ -16,6 +17,7 @@
#include <dlfcn.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <atomic>
// fast numeric to string conversion
#include "jeaiii_to_text.h"
@ -171,6 +173,7 @@ void initialize_module(const char* module_name, void* module_handle, Context* cx
}
int dll_main(int argc, char** argv, Context* cxt){
test();
aq_timer timer;
Config *cfg = reinterpret_cast<Config *>(argv[0]);
std::unordered_map<std::string, void*> user_module_map;

@ -213,7 +213,7 @@ template<>
class ColRef<void> : public ColRef<int> {};
template<typename _Ty>
class ColView {
class ColView : public vector_base<_Ty> {
public:
typedef ColRef<_Ty> Decayed_t;
const uint32_t size;

@ -2,31 +2,17 @@
#include <ctime>
#include <type_traits>
#include<string>
#include <string>
#if ((defined(_MSVC_LANG) && _MSVC_LANG >= 201703L) || __cplusplus >= 201703L)
constexpr static bool cpp_17 = true;
#else
constexpr static bool cpp_17 = false;
#endif
template <class T>
inline const char* str(const T& v) {
return "";
}
template <class T>
constexpr char* aq_itoa(T t, char* buf){
if constexpr (std::is_signed<T>::value){
if (t < 0){
*buf++ = '-';
t = -t;
}
}
while(t > 0){
*buf++ = t%10 + '0';
t /= 10;
}
return buf;
}
extern std::string base62uuid(int l = 6);

@ -17,13 +17,15 @@
#include "types.h"
#pragma pack(push, 1)
template<class T>
struct vector_base {};
struct vectortype_cstorage{
void* container;
unsigned int size, capacity;
};
template <typename _Ty>
class vector_type {
class vector_type : public vector_base<_Ty>{
public:
typedef vector_type<_Ty> Decayed_t;
void inline _copy(const vector_type<_Ty>& vt) {

Loading…
Cancel
Save