Added session manager.

dev
Bill 2 years ago
parent 9caee2819f
commit 6859671230

@ -5,9 +5,9 @@ CXXFLAGS = --std=c++1z
OPTFLAGS = -O3 -flto -march=native OPTFLAGS = -O3 -flto -march=native
SHAREDFLAGS = -shared -fPIC SHAREDFLAGS = -shared -fPIC
ifeq ($(PCH), 1) ifeq ($(PCH), 1)
PCHFLAGS = -include server/aggregations.h PCHFLAGS = -include server/aggregations.h
else else
PCHFLAGS = PCHFLAGS =
endif endif
ifeq ($(OS),Windows_NT) ifeq ($(OS),Windows_NT)

@ -30,24 +30,24 @@ AQuery++ Database is a cross-platform, In-Memory Column-Store Database that inco
- [x] Assumption - [x] Assumption
- [x] Flatten - [x] Flatten
- [x] UDFs (Hybrid Engine only) - [x] UDFs (Hybrid Engine only)
- [ ] User Module - [x] User Module
- [ ] Triggers - [ ] Triggers
- [x] Join (Hybrid Engine only) - [x] Join (Hybrid Engine only)
- [ ] Subqueries - [ ] Subqueries
- [ ] Query Optimization - [x] Query Optimization
- [x] Selection/Order by push-down - [x] Selection/Order by push-down
- [ ] Join Optimization - [x] Join Optimization (Only in Hybrid Engine)
## TODO: ## TODO:
- [x] User Module load syntax parsing (fn definition/registration) - [x] User Module load syntax parsing (fn definition/registration)
- [ ] User Module initialize location - [x] User Module initialize location
- [ ] User Module test -> User Module test
- [ ] Interval based triggers -> Interval based triggers
- [ ] Optimize Compilation Process, using static libraries, hot reloading server binary -> Optimize Compilation Process, using static libraries, hot reloading server binary
- [x] Bug fixes: type deduction misaligned in Hybrid Engine - [x] Bug fixes: type deduction misaligned in Hybrid Engine
- [ ] Investigation: Using postproc only for q1 in Hybrid Engine (make is_special always on) -> Investigation: Using postproc only for q1 in Hybrid Engine (make is_special always on)
- [ ] Limitation: putting ColRefs back to monetdb. - [x] Limitation: putting ColRefs back to monetdb.
- [ ] Limitation: String operations and Date/Time data type. - [ ] Limitation: String operations and Date/Time data type.
- [ ] C++ Meta-Programming: Eliminate template recursions as much as possible. - [ ] C++ Meta-Programming: Eliminate template recursions as much as possible.

@ -1,7 +1,7 @@
# put environment specific configuration here # put environment specific configuration here
## GLOBAL CONFIGURATION FLAGS ## GLOBAL CONFIGURATION FLAGS
version_string = '0.4.2a' version_string = '0.4.3a'
add_path_to_ldpath = True add_path_to_ldpath = True
rebuild_backend = False rebuild_backend = False
run_backend = True run_backend = True
@ -58,4 +58,3 @@ def init_config():
if os_platform == 'cygwin': if os_platform == 'cygwin':
add_dll_dir('./lib') add_dll_dir('./lib')
__config_initialized__ = True __config_initialized__ = True

@ -1,4 +1,3 @@
from multiprocessing.sharedctypes import Value
import aquery_config import aquery_config
help_message = '''\ help_message = '''\
====================================================== ======================================================
@ -122,13 +121,15 @@ class Backend_Type(enum.Enum):
class Config: class Config:
__all_attrs__ = ['running', 'new_query', 'server_mode', 'backend_type', 'has_dll', 'n_buffers'] __all_attrs__ = ['running', 'new_query', 'server_mode', 'backend_type', 'has_dll', 'n_buffers']
__init_attributes__ = False __init_attributes__ = False
@staticmethod
def __init_self__(): def __init_self__():
if not Config.__init_attributes__: if not Config.__init_attributes__:
from functools import partial from functools import partial
for _i, attr in enumerate(Config.__all_attrs__): for _i, attr in enumerate(Config.__all_attrs__):
if not hasattr(Config, attr): if not hasattr(Config, attr):
setattr(Config, attr, property(partial(Config.getter, i = _i), partial(Config.setter, i = _i))) setattr(Config, attr, property(partial(Config.getter, i = _i), partial(Config.setter, i = _i)))
__init_attributes__ = True Config.__init_attributes__ = True
def __init__(self, mode, nq = 0, n_bufs = 0, bf_szs = []) -> None: def __init__(self, mode, nq = 0, n_bufs = 0, bf_szs = []) -> None:
Config.__init_self__() Config.__init_self__()

@ -99,7 +99,9 @@ class expr(ast_node):
self.type = op.return_type(*type_vals) self.type = op.return_type(*type_vals)
except AttributeError as e: except AttributeError as e:
if type(self.root) is not udf: if type(self.root) is not udf:
print(f'alert: {e}') # TODO: do something when this is not an error
# print(f'alert: {e}')
pass
self.type = AnyT self.type = AnyT
self.sql = op(self.c_code, *str_vals) self.sql = op(self.c_code, *str_vals)
@ -142,7 +144,9 @@ class expr(ast_node):
else: else:
return var_table[node] return var_table[node]
if vec[0] not in _vars: if vec[0] not in _vars:
print(f'Use of undefined variable {vec[0]}') # print(f'Use of undefined variable {vec[0]}')
# TODO: do something when this is not an error
pass
else: else:
vname = get_vname(vec[0]) vname = get_vname(vec[0])
val = enlist(val) val = enlist(val)

@ -69,5 +69,7 @@ typedef void (*deallocator_t) (void*);
extern void* Aalloc(unsigned long long sz); extern void* Aalloc(unsigned long long sz);
extern void Afree(void * mem); extern void Afree(void * mem);
extern void register_memory(void* ptr, deallocator_t deallocator); extern void register_memory(void* ptr, deallocator_t deallocator);
extern void init_session(Context* cxt);
#define __AQ_NO_SESSION__ void init_session(Context*) {}
#endif #endif

@ -25,3 +25,7 @@ void register_memory(void* ptr, deallocator_t deallocator){
memmap->operator[](ptr) = deallocator; memmap->operator[](ptr) = deallocator;
} }
void init_session(Context* cxt){
session = &cxt->current;
// session->memory_map = new std::unordered_map<void*, deallocator_t>();
}

@ -1,4 +1,5 @@
#include "aquery.h" #include "aquery.h"
__AQ_NO_SESSION__
#include "../server/table.h" #include "../server/table.h"
__AQEXPORT__(ColRef<float>) mulvec(int a, ColRef<float> b){ __AQEXPORT__(ColRef<float>) mulvec(int a, ColRef<float> b){
@ -7,4 +8,4 @@ __AQEXPORT__(ColRef<float>) mulvec(int a, ColRef<float> b){
__AQEXPORT__(double) mydiv(int a, int b){ __AQEXPORT__(double) mydiv(int a, int b){
return a / (double)b; return a / (double)b;
} }

@ -26,7 +26,7 @@ struct Session{
size_t total_active; size_t total_active;
size_t cnt_object; size_t cnt_object;
size_t total_alloc; size_t total_alloc;
}; } stats;
void* memory_map; void* memory_map;
}; };
@ -48,7 +48,8 @@ struct Context{
void* thread_pool; void* thread_pool;
#endif #endif
printf_type print = printf; printf_type print = printf;
Context();
virtual ~Context();
template <class ...Types> template <class ...Types>
void log(Types... args) { void log(Types... args) {
if (log_level == LOG_INFO) if (log_level == LOG_INFO)
@ -73,4 +74,6 @@ struct Context{
#endif #endif
#define __AQEXPORT__(_Ty) extern "C" _Ty __DLLEXPORT__ #define __AQEXPORT__(_Ty) extern "C" _Ty __DLLEXPORT__
typedef void (*deallocator_t) (void*);
#endif #endif

@ -127,8 +127,8 @@ void* Server::getCol(int col_idx){
{ {
auto _ret_col = static_cast<monetdbe_column*>(this->ret_col); auto _ret_col = static_cast<monetdbe_column*>(this->ret_col);
cnt = _ret_col->count; cnt = _ret_col->count;
printf("Dbg: Getting col %s, type: %s\n", // printf("Dbg: Getting col %s, type: %s\n",
_ret_col->name, monetdbe_type_str[_ret_col->type]); // _ret_col->name, monetdbe_type_str[_ret_col->type]);
return _ret_col->data; return _ret_col->data;
} }
else{ else{

@ -32,14 +32,10 @@ struct SharedMemory
}; };
#endif #endif
struct thread_context{
}v;
void daemon(thread_context* c) {
}
#include "aggregations.h" #include "aggregations.h"
typedef int (*code_snippet)(void*); typedef int (*code_snippet)(void*);
typedef void (*module_init_fn)(Context*);
int test_main(); int test_main();
int n_recv = 0; int n_recv = 0;
@ -49,9 +45,11 @@ extern "C" void __DLLEXPORT__ receive_args(int argc, char**argv){
n_recv = argc; n_recv = argc;
n_recvd = argv; n_recvd = argv;
} }
enum BinaryInfo_t { enum BinaryInfo_t {
MSVC, MSYS, GCC, CLANG, AppleClang MSVC, MSYS, GCC, CLANG, AppleClang
}; };
extern "C" int __DLLEXPORT__ binary_info() { extern "C" int __DLLEXPORT__ binary_info() {
#if defined(_MSC_VER) && !defined (__llvm__) #if defined(_MSC_VER) && !defined (__llvm__)
return MSVC; return MSVC;
@ -71,12 +69,30 @@ __AQEXPORT__(bool) have_hge(){
return false; return false;
#endif #endif
} }
Context::Context() {
current.memory_map = new std::unordered_map<void*, deallocator_t>;
init_session();
}
Context::~Context() {
auto memmap = (std::unordered_map<void*, deallocator_t>*) this->current.memory_map;
delete memmap;
}
void Context::init_session(){ void Context::init_session(){
if (log_level == LOG_INFO){ if (log_level == LOG_INFO){
Session::Statistic stats; memset(&(this->current.stats), 0, sizeof(Session::Statistic));
} }
auto memmap = (std::unordered_map<void*, deallocator_t>*) this->current.memory_map;
memmap->clear();
} }
void Context::end_session(){
auto memmap = (std::unordered_map<void*, deallocator_t>*) this->current.memory_map;
for (auto& mem : *memmap) {
mem.second(mem.first);
}
memmap->clear();
}
void* Context::get_module_function(const char* fname){ void* Context::get_module_function(const char* fname){
auto fmap = static_cast<std::unordered_map<std::string, void*>*> auto fmap = static_cast<std::unordered_map<std::string, void*>*>
(this->module_function_maps); (this->module_function_maps);
@ -88,6 +104,16 @@ void* Context::get_module_function(const char* fname){
return ret == fmap->end() ? nullptr : ret->second; return ret == fmap->end() ? nullptr : ret->second;
} }
void initialize_module(const char* module_name, void* module_handle, Context* cxt){
auto _init_module = reinterpret_cast<module_init_fn>(dlsym(module_handle, "init_session"));
if (_init_module) {
_init_module(cxt);
}
else {
printf("Warning: module %s have no session support.\n", module_name);
}
}
int dll_main(int argc, char** argv, Context* cxt){ int dll_main(int argc, char** argv, Context* cxt){
Config *cfg = reinterpret_cast<Config *>(argv[0]); Config *cfg = reinterpret_cast<Config *>(argv[0]);
std::unordered_map<std::string, void*> user_module_map; std::unordered_map<std::string, void*> user_module_map;
@ -119,30 +145,35 @@ int dll_main(int argc, char** argv, Context* cxt){
if (cfg->backend_type == BACKEND_AQuery || cfg->has_dll) { if (cfg->backend_type == BACKEND_AQuery || cfg->has_dll) {
handle = dlopen("./dll.so", RTLD_LAZY); handle = dlopen("./dll.so", RTLD_LAZY);
} }
for (const auto& module : user_module_map){
initialize_module(module.first.c_str(), module.second, cxt);
}
cxt->init_session();
for(int i = 0; i < n_recv; ++i) for(int i = 0; i < n_recv; ++i)
{ {
//printf("%s, %d\n", n_recvd[i], n_recvd[i][0] == 'Q'); //printf("%s, %d\n", n_recvd[i], n_recvd[i][0] == 'Q');
switch(n_recvd[i][0]){ switch(n_recvd[i][0]){
case 'Q': case 'Q': // SQL query for monetdbe
{ {
server->exec(n_recvd[i] + 1); server->exec(n_recvd[i] + 1);
printf("Exec Q%d: %s", i, n_recvd[i]); printf("Exec Q%d: %s", i, n_recvd[i]);
} }
break; break;
case 'P': case 'P': // Postprocessing procedure
if(handle && !server->haserror()) { if(handle && !server->haserror()) {
code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, n_recvd[i]+1)); code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, n_recvd[i]+1));
c(cxt); c(cxt);
} }
break; break;
case 'M': case 'M': // Load Module
{ {
auto mname = n_recvd[i] + 1; auto mname = n_recvd[i] + 1;
user_module_handle = dlopen(mname, RTLD_LAZY); user_module_handle = dlopen(mname, RTLD_LAZY);
user_module_map[mname] = user_module_handle; user_module_map[mname] = user_module_handle;
initialize_module(mname, user_module_handle, cxt);
} }
break; break;
case 'F': case 'F': // Register Function in Module
{ {
auto fname = n_recvd[i] + 1; auto fname = n_recvd[i] + 1;
//printf("%s: %p, %p\n", fname, user_module_handle, dlsym(user_module_handle, fname)); //printf("%s: %p, %p\n", fname, user_module_handle, dlsym(user_module_handle, fname));
@ -150,7 +181,7 @@ int dll_main(int argc, char** argv, Context* cxt){
//printf("%p\n", module_fn_map->find("mydiv") != module_fn_map->end() ? module_fn_map->find("mydiv")->second : nullptr); //printf("%p\n", module_fn_map->find("mydiv") != module_fn_map->end() ? module_fn_map->find("mydiv")->second : nullptr);
} }
break; break;
case 'U': case 'U': // Unload Module
{ {
auto mname = n_recvd[i] + 1; auto mname = n_recvd[i] + 1;
auto it = user_module_map.find(mname); auto it = user_module_map.find(mname);
@ -160,17 +191,17 @@ int dll_main(int argc, char** argv, Context* cxt){
user_module_map.erase(it); user_module_map.erase(it);
} }
break; break;
} }
} }
if(handle) { if(handle) {
dlclose(handle); dlclose(handle);
handle = 0; handle = 0;
} }
cxt->end_session();
n_recv = 0; n_recv = 0;
} }
if(server->last_error == nullptr){ if(server->last_error == nullptr){
// TODO: Add feedback to prompt.
} }
else{ else{
server->last_error = nullptr; server->last_error = nullptr;

@ -30,7 +30,7 @@ public:
this->size = vt.size; this->size = vt.size;
this->capacity = vt.capacity; this->capacity = vt.capacity;
if (capacity) { if (capacity) {
puts("copy"); // puts("copy");
this->container = (_Ty*)malloc(size * sizeof(_Ty)); this->container = (_Ty*)malloc(size * sizeof(_Ty));
memcpy(container, vt.container, sizeof(_Ty) * size); memcpy(container, vt.container, sizeof(_Ty) * size);
} }
@ -44,7 +44,7 @@ public:
this->size = vt.size; this->size = vt.size;
this->capacity = vt.capacity; this->capacity = vt.capacity;
this->container = vt.container; this->container = vt.container;
puts("move"); // puts("move");
vt.size = vt.capacity = 0; vt.size = vt.capacity = 0;
vt.container = 0; vt.container = 0;
} }

Loading…
Cancel
Save