monetdb passthru, WIP: threadedGC

dev
Bill 2 years ago
parent 31a4972478
commit 5dad6e5270

@ -83,8 +83,8 @@ ifeq ($(THREADING),1)
Defines += -DTHREADING
endif
ifeq ($(AQUERY_ITC_USE_SHMEM), 1)
Defines += -D__AQUERY_ITC_USE_SHMEM__
ifeq ($(AQUERY_ITC_USE_SEMPH), 1)
Defines += -D__AQUERY_ITC_USE_SEMPH__
endif
SHAREDFLAGS += $(FPIC)

@ -22,7 +22,7 @@ def init_config():
from engine.utils import add_dll_dir
# os.environ['CXX'] = 'C:/Program Files/LLVM/bin/clang.exe'
os.environ['THREADING'] = '1'
os.environ['AQUERY_ITC_USE_SHMEM'] = '1'
os.environ['AQUERY_ITC_USE_SEMPH'] = '1'
if ('__config_initialized__' not in globals() or
not __config_initialized__):

@ -310,7 +310,6 @@ class projection(ast_node):
# cpp module codegen
self.context.has_dll = True
# extract typed-columns from result-set
vid2cname = [0]*len(self.var_table)
self.pyname2cname = dict()
@ -404,6 +403,7 @@ class projection(ast_node):
if 'into' in node:
self.context.emitc(select_into(self, node['into']).ccode)
self.has_postproc = True
if not self.distinct:
self.finalize()
@ -412,7 +412,13 @@ class projection(ast_node):
if self.parent is None:
self.context.sql_end()
self.context.postproc_end(self.postproc_fname)
if self.has_postproc:
self.context.has_dll = True
self.context.postproc_end(self.postproc_fname)
else:
self.context.ccode = ''
if self.limit != 0:
self.context.direct_output()
class select_distinct(projection):
first_order = 'select_distinct'

@ -250,6 +250,9 @@ class Context:
self.ccode = ''
self.finalize_query()
def direct_output(self):
self.queries.append('O')
def finalize_udf(self):
if self.udf is not None:
return (Context.udf_head

@ -3,46 +3,90 @@
#include <utility>
#include <thread>
#include <chrono>
#include <atomic>
#ifndef __AQ_USE_THREADEDGC__
class GC {
private:
template<class T>
using vector = vector_type<T>;
template<class ...T>
using tuple = std::tuple<T...>;
size_t current_size, max_size, interval, forced_clean;
size_t current_size = 0, max_size,
interval, forced_clean,
forceclean_timer = 0;
bool running, alive;
// ptr, dealloc, ref, sz
vector<tuple<void*, void (*)(void*)>> q;
vector<tuple<void*, void (*)(void*)>> *q, *q_back;
std::thread handle;
void gc()
{
std::atomic<std::thread::id> lock;
protected:
void acquire_lock(){
auto this_pid = std::this_thread::get_id();
while(lock != this_pid)
{
while(lock != this_pid && lock != std::thread::id()) {
std::this_thread::sleep_for(std::chrono::milliseconds(0));
}
lock = this_pid;
}
}
void reg(void* v, uint32_t ref, uint32_t sz,
void(*f)(void*) = [](void* v) {free (v); }) {
current_size += sz;
if (current_size > max_size)
gc();
q.push_back({ v, f });
void release_lock(){
lock = std::thread::id();
}
void gc()
{
if (q->size() == 0)
return;
auto t = q;
acquire_lock();
q = q_back;
release_lock();
for(const auto& t : *t) {
std::get<1>(t)(std::get<0>(t));
}
t->clear();
q_back = t;
running = false;
current_size = 0;
}
void daemon() {
using namespace std::chrono;
while (alive) {
if (running) {
gc();
if (current_size > max_size ||
forceclean_timer > forced_clean)
{
gc();
forceclean_timer = 0;
}
std::this_thread::sleep_for(microseconds(interval));
forceclean_timer += interval;
}
else {
std::this_thread::sleep_for(10ms);
forceclean_timer += 10000;
}
}
}
void start_deamon() {
handle = std::thread(&daemon);
q = new vector<tuple<void*, void (*)(void*)>>();
q_back = new vector<tuple<void*, void (*)(void*)>>();
lock = thread::id();
alive = true;
handle = std::thread(&daemon);
}
void terminate_daemon() {
running = false;
alive = false;
delete q;
delete q_back;
using namespace std::chrono;
if (handle.joinable()) {
@ -50,4 +94,36 @@ class GC {
handle.join();
}
}
public:
void reg(void* v, uint32_t sz = 1,
void(*f)(void*) = [](void* v) {free (v); }
) {
acquire_lock();
current_size += sz;
q.push_back({ v, f });
running = true;
release_lock()
}
GC(
uint32_t max_size = 0xfffffff, uint32_t interval = 10000,
uint32_t forced_clean = 1000000 //one seconds
) : max_size(max_size), interval(interval), forced_clean(forced_clean){
start_deamon();
} // 256 MB
~GC(){
terminate_daemon();
}
};
#else
class GC {
public:
GC(uint32_t) = default;
void reg(
void* v, uint32_t = 0,
void(*f)(void*) = [](void* v) {free (v); }
) const { f(v); }
}
#endif

@ -2,6 +2,7 @@
#include "libaquery.h"
#include <cstdio>
#include <string>
#include "monetdb_conn.h"
#include "monetdbe.h"
#include "table.h"
@ -35,9 +36,19 @@ const unsigned char monetdbe_type_szs[] = {
// should be last:
1
};
namespace types{
const Type_t monetdbe_type_aqtypes[] = {
ABOOL, AINT8, AINT16, AINT32, AINT64,
#ifdef HAVE_HGE
AINT128,
#endif
AUINT64, AFLOAT, ADOUBLE, ASTR,
// blob?
AINT64,
ADATE, ATIME, ATIMESTAMP, ERROR
};
}
Server::Server(Context* cxt){
if (cxt){
connect(cxt);
@ -80,7 +91,7 @@ void Server::connect(Context *cxt){
else{
if(server)
free(server);
this->server = 0;
this->server = nullptr;
status = false;
puts(ret == -1 ? "Allocation Error." : "Internal Database Error.");
}
@ -103,20 +114,58 @@ void Server::exec(const char* q){
bool Server::haserror(){
if (last_error){
last_error = 0;
last_error = nullptr;
return true;
}
else{
return false;
}
}
void Server::print_results(const char* sep, const char* end){
if (!haserror()){
auto _res = static_cast<monetdbe_result*> (res);
const auto& ncols = _res->ncols;
monetdbe_column** cols = static_cast<monetdbe_column**>(malloc(sizeof(monetdbe_column*) * ncols));
std::string* printf_string = new std::string[ncols];
const char** col_data = static_cast<const char**> (malloc(sizeof(char*) * ncols));
uint8_t* szs = static_cast<uint8_t*>(alloca(ncols));
std::string header_string = "";
const char* err_msg = nullptr;
for(uint32_t i = 0; i < ncols; ++i){
err_msg = monetdbe_result_fetch(_res, &cols[i], i);
printf_string[i] =
std::string(types::printf_str[types::monetdbe_type_aqtypes[cols[i]->type]])
+ (i < ncols - 1 ? sep : "");
puts(printf_string[i].c_str());
col_data[i] = static_cast<char *>(cols[i]->data);
szs [i] = monetdbe_type_szs[cols[i]->type];
header_string = header_string + cols[i]->name + sep + '|' + sep;
}
const size_t l_sep = strlen(sep) + 1;
if (header_string.size() - l_sep >= 0)
header_string.resize(header_string.size() - l_sep);
header_string += end + std::string(header_string.size(), '=') + end;
fputs(header_string.c_str(), stdout);
for(uint64_t i = 0; i < cnt; ++i){
for(uint32_t j = 0; j < ncols; ++j){
printf(printf_string[j].c_str(), *((void**)col_data[j]));
col_data[j] += szs[j];
}
fputs(end, stdout);
}
free(cols);
delete[] printf_string;
free(col_data);
}
}
void Server::close(){
if(this->server){
auto server = static_cast<monetdbe_database*>(this->server);
monetdbe_close(*(server));
free(server);
this->server = 0;
this->server = nullptr;
}
}
@ -140,7 +189,7 @@ void* Server::getCol(int col_idx){
else{
puts("Error: No result.");
}
return 0;
return nullptr;
}
Server::~Server(){

@ -22,6 +22,8 @@ struct Server{
void close();
bool haserror();
static bool havehge();
void test(const char*);
void print_results(const char* sep = " ", const char* end = "\n");
~Server();
};

@ -43,11 +43,11 @@ public:
native_handle = dispatch_semaphore_create(v);
}
void acquire() {
puts("acquire");
// puts("acquire");
dispatch_semaphore_wait(native_handle, DISPATCH_TIME_FOREVER);
}
void release() {
puts("release");
// puts("release");
dispatch_semaphore_signal(native_handle);
}
~A_Semaphore() {
@ -94,7 +94,7 @@ public:
~A_Semaphore() { }
};
#endif
#ifdef __AQUERY_ITC_USE_SHMEM__
#ifdef __AQUERY_ITC_USE_SEMPH__
A_Semaphore prompt{ true }, engine{ false };
#define PROMPT_ACQUIRE() prompt.acquire()
#define PROMPT_RELEASE() prompt.release()
@ -283,6 +283,15 @@ int dll_main(int argc, char** argv, Context* cxt){
//printf("F::: %p\n", module_fn_map->find("mydiv") != module_fn_map->end() ? module_fn_map->find("mydiv")->second : nullptr);
}
break;
case 'O':
{
if(!server->haserror()){
timer.reset();
server->print_results();
cfg->stats.postproc_time += timer.elapsed();
}
}
break;
case 'U': // Unload Module
{
auto mname = n_recvd[i] + 1;

Loading…
Cancel
Save