master
Bill 2 years ago
parent 9c2bac3ec1
commit 5ba333ca20

@ -304,10 +304,11 @@ SELECT * FROM my_table WHERE c1 > 10
- [x] Query Optimization - [x] Query Optimization
- [x] Selection/Order by push-down - [x] Selection/Order by push-down
- [x] Join Optimization (Only in Hybrid Engine) - [x] Join Optimization (Only in Hybrid Engine)
- [ ] Threaded GC - [x] Threaded GC
- [ ] Extensibility - [ ] Extensibility
- [x] UDFs (Hybrid Engine only) - [x] UDFs (Hybrid Engine only)
- [x] SDK and User Module - [x] SDK and User Module
- [x] Stored Procedures
- [ ] Triggers - [ ] Triggers
# Known Issues: # Known Issues:

@ -16,6 +16,8 @@ private:;
std::atomic<uint32_t> alive_cnt; std::atomic<uint32_t> alive_cnt;
std::atomic<uint64_t> current_size; std::atomic<uint64_t> current_size;
volatile bool lock; volatile bool lock;
using gc_deallocator_t = void (*)(void*);
// maybe use volatile std::thread::id instead // maybe use volatile std::thread::id instead
protected: protected:
void acquire_lock(); void acquire_lock();
@ -46,6 +48,12 @@ public:
terminate_daemon(); terminate_daemon();
} }
static GC* gc_handle; static GC* gc_handle;
template <class T>
constexpr static inline gc_deallocator_t _delete(T*){
return [](void* v){
delete (T*)v;
};
}
constexpr static void(*_free) (void*) = free; constexpr static void(*_free) (void*) = free;
}; };

@ -9,6 +9,7 @@
#include "utils.h" #include "utils.h"
#include "libaquery.h" #include "libaquery.h"
#include <random> #include <random>
#include "gc.h"
char* gbuf = nullptr; char* gbuf = nullptr;
@ -276,10 +277,29 @@ inline const char* str(const bool& v) {
return v ? "true" : "false"; return v ? "true" : "false";
} }
class A{
public:
std::chrono::high_resolution_clock::time_point tp;
A(){
tp = std::chrono::high_resolution_clock::now();
printf("A %llx created.\n", tp.time_since_epoch().count());
}
~A() {
printf("A %llx died after %lldns.\n", tp.time_since_epoch().count(),
(std::chrono::high_resolution_clock::now() - tp).count());
}
};
Context::Context() { Context::Context() {
current.memory_map = new std::unordered_map<void*, deallocator_t>; current.memory_map = new std::unordered_map<void*, deallocator_t>;
init_session(); #ifndef __AQ_USE_THREADEDGC__
this->gc = new GC();
#endif
GC::gc_handle->reg(new A(), 6553600, [](void* a){
puts("deleting");
delete ((A*)a);
});
init_session();
} }
Context::~Context() { Context::~Context() {
@ -328,7 +348,6 @@ void* Context::get_module_function(const char* fname){
// std::cout << ')'; // std::cout << ')';
// } // }
#include "gc.h"
#include <utility> #include <utility>
#include <thread> #include <thread>
#ifndef __AQ_USE_THREADEDGC__ #ifndef __AQ_USE_THREADEDGC__
@ -383,7 +402,7 @@ void GC::daemon() {
while (alive) { while (alive) {
if (running) { if (running) {
if (current_size - max_size > 0 || if (uint64_t(current_size) > max_size ||
forceclean_timer > forced_clean) forceclean_timer > forced_clean)
{ {
gc(); gc();

@ -73,6 +73,7 @@ struct StoredProcedure{
const char* name; const char* name;
void **__rt_loaded_modules; void **__rt_loaded_modules;
}; };
struct Context{ struct Context{
typedef int (*printf_type) (const char *format, ...); typedef int (*printf_type) (const char *format, ...);
@ -90,7 +91,7 @@ struct Context{
#ifdef THREADING #ifdef THREADING
void* thread_pool; void* thread_pool;
#endif #endif
#ifdef __AQ_THREADED_GC__ #ifndef __AQ_USE_THREADEDGC__
void* gc; void* gc;
#endif #endif
printf_type print = &printf; printf_type print = &printf;
@ -162,4 +163,10 @@ template<> char* aq_to_chars<types::date_t>(void* , char*);
template<> char* aq_to_chars<types::time_t>(void* , char*); template<> char* aq_to_chars<types::time_t>(void* , char*);
template<> char* aq_to_chars<types::timestamp_t>(void* , char*); template<> char* aq_to_chars<types::timestamp_t>(void* , char*);
typedef int (*code_snippet)(void*); typedef int (*code_snippet)(void*);
template <class _This_Struct>
inline void AQ_ZeroMemory(_This_Struct& __val) {
memset(&__val, 0, sizeof(_This_Struct));
}
#endif #endif

@ -347,8 +347,8 @@ start:
dll_path = proc_name; dll_path = proc_name;
} }
proc_name = dll_path.c_str(); proc_name = dll_path.c_str();
if(recorded_libraries.size) //if(recorded_libraries.size)
recorded_queries.emplace_back(copy_lpstr("N")); recorded_queries.emplace_back(copy_lpstr("N"));
} }
handle = dlopen(proc_name, RTLD_NOW); handle = dlopen(proc_name, RTLD_NOW);
if (procedure_recording) { if (procedure_recording) {
@ -442,6 +442,7 @@ start:
{ {
if(procedure_module_cursor < current_procedure.postproc_modules) if(procedure_module_cursor < current_procedure.postproc_modules)
handle = current_procedure.__rt_loaded_modules[procedure_module_cursor++]; handle = current_procedure.__rt_loaded_modules[procedure_module_cursor++];
printf("Load %i = %p\n", procedure_module_cursor, handle);
} }
break; break;
case 'R': //recorded procedure case 'R': //recorded procedure
@ -510,10 +511,7 @@ start:
switch(n_recvd[i][1]){ switch(n_recvd[i][1]){
case '\0': case '\0':
current_procedure.name = copy_lpstr(proc_name); current_procedure.name = copy_lpstr(proc_name);
current_procedure.cnt = 0; AQ_ZeroMemory(current_procedure);
current_procedure.queries = nullptr;
current_procedure.postproc_modules = 0;
current_procedure.__rt_loaded_modules = nullptr;
procedure_recording = true; procedure_recording = true;
procedure_name = proc_name; procedure_name = proc_name;
break; break;
@ -523,10 +521,8 @@ start:
current_procedure.name = copy_lpstr(proc_name); current_procedure.name = copy_lpstr(proc_name);
current_procedure.postproc_modules = recorded_libraries.size; current_procedure.postproc_modules = recorded_libraries.size;
current_procedure.__rt_loaded_modules = recorded_libraries.container; current_procedure.__rt_loaded_modules = recorded_libraries.container;
recorded_queries.size = recorded_queries.capacity = 0; AQ_ZeroMemory(recorded_queries);
recorded_queries.container = nullptr; AQ_ZeroMemory(recorded_libraries);
recorded_libraries.size = recorded_libraries.capacity = 0;
recorded_libraries.container = nullptr;
procedure_recording = false; procedure_recording = false;
save_proc_tofile(current_procedure); save_proc_tofile(current_procedure);
cxt->stored_proc.insert_or_assign(procedure_name, current_procedure); cxt->stored_proc.insert_or_assign(procedure_name, current_procedure);
@ -534,6 +530,7 @@ start:
break; break;
case 'E': // execute procedure case 'E': // execute procedure
{ {
procedure_module_cursor = 0;
auto _proc = cxt->stored_proc.find(proc_name); auto _proc = cxt->stored_proc.find(proc_name);
if (_proc == cxt->stored_proc.end()){ if (_proc == cxt->stored_proc.end()){
printf("Procedure %s not found. Trying load from disk.\n", proc_name); printf("Procedure %s not found. Trying load from disk.\n", proc_name);
@ -571,7 +568,9 @@ start:
break; break;
} }
} }
if(handle && procedure_replaying) { if(handle &&
!procedure_replaying && !procedure_recording) {
printf("Destroy %p\n", handle);
dlclose(handle); dlclose(handle);
handle = nullptr; handle = nullptr;
} }
@ -579,10 +578,8 @@ start:
cxt->end_session(); cxt->end_session();
n_recv = 0; n_recv = 0;
} }
if(server->last_error == nullptr){ if (server->last_error != nullptr) {
// TODO: Add feedback to prompt. printf("Monetdbe Error: %s\n", server->last_error);
}
else{
server->last_error = nullptr; server->last_error = nullptr;
//goto finalize; //goto finalize;
} }

Loading…
Cancel
Save