added user module

dev
Bill 2 years ago
parent 44ccc0b835
commit 4321422b4f

2
.gitignore vendored

@ -54,6 +54,6 @@ saves
*.exe
out*.cpp
udf*.hpp
*.ipynb

@ -38,7 +38,7 @@ server.so:
# $(CXX) server/server.cpp server/monetdb_conn.cpp -fPIC -shared $(OS_SUPPORT) monetdb/msvc/monetdbe.dll --std=c++1z -O3 -march=native -o server.so -I./monetdb/msvc
$(CXX) $(SHAREDFLAGS) server/server.cpp server/io.cpp server/table.cpp $(OS_SUPPORT) server/monetdb_conn.cpp $(Threading) $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o server.so
snippet:
$(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp server/monetdb_conn.cpp server/table.cpp server/io.cpp $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o dll.so
$(CXX) $(SHAREDFLAGS) $(PCHFLAGS) out.cpp server/server.cpp server/monetdb_conn.cpp server/table.cpp server/io.cpp $(MonetDB_LIB) $(OPTFLAGS) $(CXXFLAGS) -o dll.so
docker:
docker build -t aquery .

@ -1,3 +1,4 @@
# AQuery++ Database
## Introduction
@ -39,10 +40,13 @@ AQuery++ Database is a cross-platform, In-Memory Column-Store Database that inco
## TODO:
- [ ] User Module load syntax parsing (fn definition/registration)
- [x] User Module load syntax parsing (fn definition/registration)
- [ ] User Module initialize location
- [ ] User Module test
- [ ] Interval based triggers
- [ ] Bug fixes: type deduction misaligned in Hybrid Engine
- [ ] Optimize Compilation Process, using static libraries, hot reloading server binary
- [x] Bug fixes: type deduction misaligned in Hybrid Engine
- [ ] Investigation: Using postproc only for q1 in Hybrid Engine (make is_special always on)
- [ ] Limitation: putting ColRefs back to monetdb.
- [ ] C++ Meta-Programming: Eliminate template recursions as much as possible.

@ -1,7 +1,7 @@
# put environment specific configuration here
## GLOBAL CONFIGURATION FLAGS
version_string = '0.4.0a'
version_string = '0.4.1a'
add_path_to_ldpath = True
rebuild_backend = True
run_backend = True

@ -351,6 +351,12 @@ def prompt(running = lambda:True, next = input, state = None):
elif q.startswith('xexec'): # generate build and run (MonetDB Engine)
state.cfg.backend_type = Backend_Type.BACKEND_MonetDB.value
cxt = xengine.exec(state.stmts, cxt, keep)
this_udf = cxt.finalize_udf()
if this_udf:
with open('udf.hpp', 'wb') as outfile:
outfile.write(this_udf.encode('utf-8'))
if state.server_mode == RunType.Threaded:
# assignment to avoid auto gc
# sqls = [s.strip() for s in cxt.sql.split(';')]
@ -362,11 +368,7 @@ def prompt(running = lambda:True, next = input, state = None):
state.send(sz, payload)
except TypeError as e:
print(e)
this_udf = cxt.finalize_udf()
if this_udf:
with open('udf.hpp', 'wb') as outfile:
outfile.write(this_udf.encode('utf-8'))
qs = re.split(r'[ \t]', q)
build_this = not(len(qs) > 1 and qs[1].startswith('n'))
if cxt.has_dll:
@ -472,11 +474,14 @@ def prompt(running = lambda:True, next = input, state = None):
raise
except BaseException as e:
import code, traceback
raise_exception = True
sh = code.InteractiveConsole({**globals(), **locals()})
sh.interact(banner = traceback.format_exc(), exitmsg = 'debugging session ended.')
save('', cxt)
rm(state)
raise e
# control whether to raise exception in interactive console
if raise_exception:
raise e
rm(state)
## FUNCTIONS END

@ -659,7 +659,8 @@ class load(ast_node):
self.context.module_map[fname] = cpp_stub
#registration for parser
self.functions[fname] = user_module_function(fname, nargs, ret_type, self.context)
self.context.module_init_loc = len(self.context.queries)
def produce_aq(self, node):
node = node['load']
s1 = 'LOAD DATA INFILE '

@ -95,6 +95,7 @@ class Context:
self.scans = []
self.procs = []
self.queries = []
self.module_init_loc = 0
def __init__(self):
self.tables_byname = dict()
@ -143,14 +144,18 @@ class Context:
ret = '__AQEXPORT__(void) __builtin_init_user_module(Context* cxt){\n'
for fname in self.module_map.keys():
ret += f'{fname} = (decltype({fname}))(cxt->get_module_function("{fname}"));\n'
self.queries.insert(0, f'P__builtin_init_user_module')
self.queries.insert(self.module_init_loc, 'P__builtin_init_user_module')
return ret + '}\n'
def sql_begin(self):
self.sql = ''
def sql_end(self):
if self.sql.strip():
# eliminate empty queries
s = self.sql.strip()
while(s and s[-1] == ';'):
s = s[:-1].strip()
if s and s.lower() != 'select':
self.queries.append('Q' + self.sql)
self.sql = ''

@ -39,10 +39,6 @@ struct Context{
Session current;
#ifdef THREADING
void* thread_pool;
#endif
printf_type print = printf;
template <class ...Types>
void log(Types... args) {
@ -57,7 +53,7 @@ struct Context{
void init_session();
void end_session();
void* get_module_function(const char*);
char remainder[];
char remainder[];
};
#ifdef _WIN32
@ -68,10 +64,10 @@ struct Context{
#define __AQEXPORT__(_Ty) extern "C" _Ty __DLLEXPORT__
typedef void (*dealloctor_t) (void*);
typedef void (*deallocator_t) (void*);
extern void* Aalloc(size_t sz);
extern void* Aalloc(unsigned long long sz);
extern void Afree(void * mem);
extern size_t register_memory(void* ptr, dealloctor_t deallocator);
extern void register_memory(void* ptr, deallocator_t deallocator);
#endif

@ -1,7 +1,7 @@
#include "aquery.h"
#include <memory>
#include <stdlib>
#include <cstdlib>
#include <unordered_map>
Session* session;
@ -9,19 +9,19 @@ Session* session;
void* Aalloc(size_t sz, deallocator_t deallocator){
void* mem = malloc(sz);
auto memmap = (std::unordered_map<void*, dealloctor_t>*) session->memory_map;
memmap[mem] = deallocator;
auto memmap = (std::unordered_map<void*, deallocator_t>*) session->memory_map;
memmap->operator[](mem) = deallocator;
return mem;
}
void Afree(void* mem){
auto memmap = (std::unordered_map<void*, dealloctor_t>*) session->memory_map;
memmap[mem](mem);
auto memmap = (std::unordered_map<void*, deallocator_t>*) session->memory_map;
memmap->operator[](mem)(mem);
memmap->erase(mem);
}
void register_memory(void* ptr, deallocator_t deallocator){
auto memmap = (std::unordered_map<void*, dealloctor_t>*) session->memory_map;
memmap[ptr] = deallocator;
auto memmap = (std::unordered_map<void*, deallocator_t>*) session->memory_map;
memmap->operator[](ptr) = deallocator;
}

@ -0,0 +1,10 @@
#include "aquery.h"
#include "../server/table.h"
__AQEXPORT__(ColRef<float>) mulvec(int a, ColRef<float> b){
return a * b;
}
__AQEXPORT__(double) mydiv(int a, int b){
return a / (double)b;
}

@ -80,6 +80,10 @@ void Context::init_session(){
void* Context::get_module_function(const char* fname){
auto fmap = static_cast<std::unordered_map<std::string, void*>*>
(this->module_function_maps);
//printf("%p\n", fmap->find("mydiv")->second);
for (const auto& [key, value] : *fmap){
printf("%s %p\n", key.c_str(), value);
}
auto ret = fmap->find(fname);
return ret == fmap->end() ? nullptr : ret->second;
}
@ -141,7 +145,9 @@ int dll_main(int argc, char** argv, Context* cxt){
case 'F':
{
auto fname = n_recvd[i] + 1;
printf("%s: %p, %p\n", fname, user_module_handle, dlsym(user_module_handle, fname));
module_fn_map->insert_or_assign(fname, dlsym(user_module_handle, fname));
printf("%p\n", module_fn_map->find("mydiv") != module_fn_map->end() ? module_fn_map->find("mydiv")->second : nullptr);
}
break;
case 'U':

@ -28,4 +28,8 @@ echo Testing UDF with calls to other UDFs
f funcs.a
xexec
echo Testing User Modules
f modules.a
xexec
exit

Loading…
Cancel
Save