WIP: sdk, bug fixes, os support regression fix

dev
Bill 2 years ago
parent 751a442554
commit eaae297b49

1
.gitignore vendored

@ -1,3 +1,4 @@
a.out.*
*.log
*.pyc
*.tab

@ -1,37 +1,59 @@
# put environment specific configuration here
import os
# os.environ['CXX'] = 'C:/Program Files/LLVM/bin/clang.exe'
os.environ['THREADING'] = '0'
## GLOBAL CONFIGURATION FLAGS
add_path_to_ldpath = True
rebuild_backend = True
run_backend = True
have_hge = False
os_platform = 'unkown'
import sys
if os.name == 'nt':
if sys.platform == 'win32':
os_platform = 'win'
elif sys.platform == 'cygwin' or sys.platform == 'msys':
os_platform = 'cygwin'
elif os.name == 'posix':
if sys.platform == 'darwin':
os_platform = 'mac'
elif 'linux' in sys.platform:
os_platform = 'linux'
elif 'bsd' in sys.platform:
os_platform = 'bsd'
# deal with msys dependencies:
if os_platform == 'win':
os.add_dll_directory('c:/msys64/usr/bin')
os.add_dll_directory(os.path.abspath('./msc-plugin'))
print("adding path")
else:
import readline
cygroot = 'c:/msys64/usr/bin'
msbuildroot = 'd:/gg/vs22/MSBuild/Current/Bin'
__config_initialized__ = False
class Build_Config:
def __init__(self) -> None:
self.OptimizationLv = '4' # [O0, O1, O2, O3, Ofast]
self.Platform = 'x64'
self.fLTO = True
self.fmarchnative = True
def configure(self):
pass
def init_config():
global __config_initialized__, os_platform
## SETUP ENVIRONMENT VARIABLES
import os
from engine.utils import add_dll_dir
# os.environ['CXX'] = 'C:/Program Files/LLVM/bin/clang.exe'
os.environ['THREADING'] = '1'
if not __config_initialized__:
import sys
if os.name == 'nt':
if sys.platform == 'win32':
os_platform = 'win'
elif sys.platform == 'cygwin' or sys.platform == 'msys':
os_platform = 'cygwin'
elif os.name == 'posix':
if sys.platform == 'darwin':
os_platform = 'mac'
elif 'linux' in sys.platform:
os_platform = 'linux'
elif 'bsd' in sys.platform:
os_platform = 'bsd'
elif sys.platform == 'cygwin' or sys.platform == 'msys':
os_platform = 'cygwin'
# deal with msys dependencies:
if os_platform == 'win':
add_dll_dir(cygroot)
add_dll_dir(os.path.abspath('./msc-plugin'))
# print("adding path")
else:
import readline
if os_platform == 'cygwin':
add_dll_dir('./lib')
__config_initialized__ = True

@ -8,8 +8,7 @@
- os.add_dll_directory(os.path.abspath('./monetdb/msvc'))
- gcc-mingw (link w/ mingw monetdb, can only load under mingw python):
- $(CXX) server/server.cpp server/monetdb_conn.cpp -fPIC -shared $(OS_SUPPORT) --std=c++1z -O3 -march=native -o server.so -I./monetdb/msys64 -L./lib -lmonetdbe
- os.add_dll_directory('c:/msys64/usr/bin')
- os.add_dll_directory(os.path.abspath('./lib'))
- add_dll_dir(os.path.abspath('./lib'))
- msvc:
- D:\gg\vs22\MSBuild\Current\Bin\msbuild "d:\gg\AQuery++\server\server.vcxproj" /p:configuration=Release /p:platform=x64
- os.add_dll_directory(os.path.abspath('./monetdb/msvc'))

@ -0,0 +1 @@
create table strtest(a: string, b: string);

@ -71,3 +71,21 @@ def remove_last(pattern : str, string : str, escape : Set[str] = set()) -> str:
else:
return string[:idx] + string[idx+1:]
class _Counter:
def __init__(self, cnt):
self.cnt = cnt
def inc(self, cnt = 1):
self.cnt += cnt
cnt = self.cnt - cnt
return cnt
import re
ws = re.compile(r'\s+')
def add_dll_dir(dll: str):
import sys, os
if sys.version_info.major >= 3 and sys.version_info.minor >7 and os.name == 'nt':
os.add_dll_directory(dll)
else:
os.environ['PATH'] = os.path.abspath(dll) + os.pathsep + os.environ['PATH']

@ -1,60 +0,0 @@
#include "./server/libaquery.h"
#include <unordered_map>
#include "./server/hasher.h"
#include "csv.h"
#include "./server/aggregations.h"
extern "C" int __DLLEXPORT__ dllmain(Context* cxt) {
using namespace std;
using namespace types;
auto sale = new TableInfo<int,int>("sale", 2);
cxt->tables.insert({"sale", sale});
auto& sale_Month = *(ColRef<int> *)(&sale->colrefs[0]);
auto& sale_sales = *(ColRef<int> *)(&sale->colrefs[1]);
sale_Month.init();
sale_sales.init();
io::CSVReader<2> csv_reader_53ychC("moving_avg.csv");
csv_reader_53ychC.read_header(io::ignore_extra_column, "Month","sales");
int tmp_7ttMnHd3;
int tmp_5nHjeAtP;
while(csv_reader_53ychC.read_row(tmp_7ttMnHd3,tmp_5nHjeAtP)) {
sale_Month.emplace_back(tmp_7ttMnHd3);
sale_sales.emplace_back(tmp_5nHjeAtP);
}
auto out_3Xio = new TableInfo<decays<decltype(sale_Month[0])>,decays<decltype(avgw(3,sale_sales))>>("out_3Xio", 2);
cxt->tables.insert({"out_3Xio", out_3Xio});
auto& out_3Xio_Month = *(ColRef<decays<decltype(sale_Month[0])>> *)(&out_3Xio->colrefs[0]);
auto& out_3Xio_avgsw3salesales = *(ColRef<decays<decltype(avgw(3,sale_sales))>> *)(&out_3Xio->colrefs[1]);
out_3Xio_Month.init();
out_3Xio_Month = sale_Month;
out_3Xio_avgsw3salesales.init();
out_3Xio_avgsw3salesales = avgw(3,sale_sales);
// print(*out_3Xio);
FILE* fp_4nKGhD = fopen("moving_avg_output.csv", "w");
out_3Xio->printall(",", "\n", nullptr, fp_4nKGhD);
fclose(fp_4nKGhD);
typedef record<decltype(sale_sales[0])> record_type1H2vDGL;
unordered_map<record_type1H2vDGL, vector_type<uint32_t>, transTypes<record_type1H2vDGL, hasher>> g6Mjxfk5;
for (uint32_t i7u = 0; i7u < sale_sales.size; ++i7u){
g6Mjxfk5[forward_as_tuple(sale_sales[i7u])].emplace_back(i7u);
}
auto out_2IU2 = new TableInfo<decays<decltype(sale_sales[0])>,decays<decltype(minw(2,sale_Month))>>("out_2IU2", 2);
cxt->tables.insert({"out_2IU2", out_2IU2});
auto& out_2IU2_sales = *(ColRef<decays<decltype(sale_sales[0])>> *)(&out_2IU2->colrefs[0]);
auto& out_2IU2_minsw2saleMonth = *(ColRef<decays<decltype(minw(2,sale_Month))>> *)(&out_2IU2->colrefs[1]);
out_2IU2_sales.init();
out_2IU2_minsw2saleMonth.init();
for(auto& i5J : g6Mjxfk5) {
auto &key_4jl5toH = i5J.first;
auto &val_VJGwVwH = i5J.second;
out_2IU2_sales.emplace_back(get<0>(key_4jl5toH));
out_2IU2_minsw2saleMonth.emplace_back(minw(2,sale_Month[val_VJGwVwH]));
}
// print(*out_2IU2);
FILE* fp_18R4fY = fopen("flatten.csv", "w");
out_2IU2->printall(",","\n", nullptr, fp_18R4fY);
fclose(fp_18R4fY);
return 0;
}

@ -1,26 +1,56 @@
import os
from engine.ast import Context
if __name__ == '__main__':
import mimetypes
mimetypes._winreg = None
from dataclasses import dataclass
import enum
import re
from tabnanny import check
import time
# import dbconn
import re
from typing import Callable, List, Optional
from mo_parsing import ParseException
import aquery_parser as parser
import engine
import engine.projection
import engine.ddl
import reconstruct as xengine
import subprocess
import mmap
import sys
import os
from engine.utils import base62uuid
import atexit
import threading
import ctypes
import aquery_config
import numpy as np
from engine.utils import ws
from engine.utils import add_dll_dir
## GLOBALS BEGIN
nullstream = open(os.devnull, 'w')
help_message = '''\
Run prompt.py without supplying with any arguments to run in interactive mode.
--help, -h
print out this help message
--version, -v
returns current version of AQuery
--mode, -m Optional[threaded|IPC]
execution engine run mode:
threaded or 1 (default): run the execution engine and compiler/prompt in separate threads
IPC, standalong or 2: run the execution engine in a new process which uses shared memory to communicate with the compiler process
--script, -s [SCRIPT_FILE]
script mode: run the aquery script file
--parse-only, -p
parse only: parse the file and print out the AST
'''
## GLOBALS END
## CLASSES BEGIN
class RunType(enum.Enum):
Threaded = 0
IPC = 1
@ -30,75 +60,19 @@ class Backend_Type(enum.Enum):
BACKEND_MonetDB = 1
BACKEND_MariaDB = 2
server_mode = RunType.Threaded
server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so'
nullstream = open(os.devnull, 'w')
if aquery_config.rebuild_backend:
try:
os.remove(server_bin)
except Exception as e:
print(type(e), e)
subprocess.call(['make', "info"])
subprocess.call(['make', server_bin], stdout=nullstream)
cleanup = True
def rm():
global cleanup
if cleanup:
mm.seek(0,os.SEEK_SET)
mm.write(b'\x00\x00')
mm.flush()
try:
time.sleep(.001)
server.kill()
time.sleep(.001)
server.terminate()
except OSError:
pass
files = os.listdir('.')
for f in files:
if f.endswith('.shm'):
os.remove(f)
mm.close()
cleanup = False
nullstream.close()
def init_ipc():
global shm, server, basecmd, mm
shm = base62uuid()
if sys.platform != 'win32':
shm += '.shm'
basecmd = ['bash', '-c', 'rlwrap k']
mm = None
if not os.path.isfile(shm):
# create initial file
with open(shm, "w+b") as handle:
handle.write(b'\x01\x00') # [running, new job]
handle.flush()
mm = mmap.mmap(handle.fileno(), 2, access=mmap.ACCESS_WRITE, offset=0)
if mm is None:
exit(1)
else:
basecmd = ['bash.exe', '-c', 'rlwrap ./k']
mm = mmap.mmap(0, 2, shm)
mm.write(b'\x01\x00')
mm.flush()
server = subprocess.Popen(["./server.bin", shm])
import numpy as np
c = lambda _ba: ctypes.cast((ctypes.c_char * len(_ba)).from_buffer(_ba), ctypes.c_char_p)
class Config:
def __init__(self, nq = 0, mode = server_mode, n_bufs = 0, bf_szs = []) -> None:
__all_attrs__ = ['running', 'new_query', 'server_mode', 'backend_type', 'has_dll', 'n_buffers']
__init_attributes__ = False
def __init_self__():
if not Config.__init_attributes__:
from functools import partial
for _i, attr in enumerate(Config.__all_attrs__):
if not hasattr(Config, attr):
setattr(Config, attr, property(partial(Config.getter, i = _i), partial(Config.setter, i = _i)))
__init_attributes__ = True
def __init__(self, mode, nq = 0, n_bufs = 0, bf_szs = []) -> None:
Config.__init_self__()
self.int_size = 4
self.n_attrib = 6
self.buf = bytearray((self.n_attrib + n_bufs) * self.int_size)
@ -121,29 +95,83 @@ class Config:
@property
def c(self):
return c(self.buf)
return ctypes.cast(\
(ctypes.c_char * len(self.buf)).from_buffer(self.buf), ctypes.c_char_p)
@dataclass
class PromptState():
cleanup = True
th = None
send = None
test_parser = True
server_mode : RunType = RunType.Threaded
server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so'
set_ready = lambda: None
get_ready = lambda: None
server_status = lambda: False
cfg : Config = None
shm : str = ''
server : subprocess.Popen = None
basecmd : List[str] = None
mm : mmap = None
th : threading.Thread
send : Callable = lambda *_:None
init : Callable[['PromptState'], None] = lambda _:None
stmts = ['']
payloads = {}
## CLASSES END
## FUNCTIONS BEGIN
def rm(state:PromptState):
if state.cleanup:
state.mm.seek(0,os.SEEK_SET)
state.mm.write(b'\x00\x00')
state.mm.flush()
def binder(cls, attr, _i):
from functools import partial
setattr(cls, attr, property(partial(cls.getter, i = _i), partial(cls.setter, i = _i)))
binder(Config, 'running', 0)
binder(Config, 'new_query', 1)
binder(Config, 'server_mode', 2)
binder(Config, 'backend_type', 3)
binder(Config, 'has_dll', 4)
binder(Config, 'n_buffers', 5)
try:
time.sleep(.001)
state.server.kill()
time.sleep(.001)
state.server.terminate()
except OSError:
pass
cfg = Config()
th = None
send = None
files = os.listdir('.')
for f in files:
if f.endswith('.shm'):
os.remove(f)
state.mm.close()
state.cleanup = False
nullstream.close()
def init_threaded():
def init_ipc(state: PromptState):
state.shm = base62uuid()
if sys.platform != 'win32':
state.shm += '.shm'
state.basecmd = ['bash', '-c', 'rlwrap k']
state.mm = None
if not os.path.isfile(shm):
# create initial file
with open(shm, "w+b") as handle:
handle.write(b'\x01\x00') # [running, new job]
handle.flush()
state.mm = mmap.mmap(handle.fileno(), 2, access=mmap.ACCESS_WRITE, offset=0)
if state.mm is None:
exit(1)
else:
state.basecmd = ['bash.exe', '-c', 'rlwrap ./k']
state.mm = mmap.mmap(0, 2, state.shm)
state.mm.write(b'\x01\x00')
state.mm.flush()
state.server = subprocess.Popen(["./server.bin", state.shm])
def init_threaded(state : PromptState):
state.cleanup = False
if os.name == 'nt' and aquery_config.add_path_to_ldpath:
t = os.environ['PATH'].lower().split(';')
vars = re.compile('%.*%')
os.add_dll_directory(os.path.abspath('.'))
os.add_dll_directory(os.path.abspath('./lib'))
add_dll_dir(os.path.abspath('.'))
add_dll_dir(os.path.abspath('./lib'))
for e in t:
if(len(e) != 0):
if '%' in e:
@ -154,7 +182,7 @@ def init_threaded():
except Exception:
continue
try:
os.add_dll_directory(e)
add_dll_dir(e)
except Exception:
continue
@ -162,184 +190,258 @@ def init_threaded():
os.environ['PATH'] = os.environ['PATH'] + os.pathsep + os.path.abspath('.')
os.environ['PATH'] = os.environ['PATH'] + os.pathsep + os.path.abspath('./lib')
if aquery_config.run_backend:
server_so = ctypes.CDLL('./'+server_bin)
global cfg, th, send
send = server_so['receive_args']
server_so = ctypes.CDLL('./'+state.server_bin)
state.send = server_so['receive_args']
aquery_config.have_hge = server_so['have_hge']()
if aquery_config.have_hge:
from engine.types import get_int128_support
get_int128_support()
th = threading.Thread(target=server_so['main'], args=(-1, ctypes.POINTER(ctypes.c_char_p)(cfg.c)), daemon=True)
th.start()
if server_mode == RunType.IPC:
atexit.register(rm)
init = init_ipc
set_ready = lambda : mm.seek(0,os.SEEK_SET) or mm.write(b'\x01\x01')
def __get_ready():
mm.seek(0,os.SEEK_SET)
return mm.read(2)[1]
get_ready = __get_ready
server_status = lambda : server.poll() is not None
else:
init = init_threaded
rm = lambda: None
def __set_ready():
global cfg
cfg.new_query = 1
set_ready = __set_ready
get_ready = lambda: aquery_config.run_backend and cfg.new_query
if aquery_config.run_backend:
server_status = lambda : not th.is_alive()
else:
server_status = lambda : True
init()
test_parser = True
# code to test parser
ws = re.compile(r'\s+')
q = 'SELECT p.Name, v.Name FROM Production.Product p JOIN Purchasing.ProductVendor pv ON p.ProductID = pv.ProductID JOIN Purchasing.Vendor v ON pv.BusinessEntityID = v.BusinessEntityID WHERE ProductSubcategoryID = 15 ORDER BY v.Name;'
res = parser.parse(q)
payload = None
keep = True
cxt = engine.initialize()
cxt.Info(res)
while test_parser:
try:
if server_status():
init()
while get_ready():
time.sleep(.00001)
print("> ", end="")
q = input().lower()
if q == 'exec': # generate build and run (AQuery Engine)
cfg.backend_type = Backend_Type.BACKEND_AQuery.value
cxt = engine.exec(stmts, cxt, keep)
if subprocess.call(['make', 'snippet'], stdout = nullstream) == 0:
set_ready()
continue
state.th = threading.Thread(target=server_so['main'], args=(-1, ctypes.POINTER(ctypes.c_char_p)(state.cfg.c)), daemon=True)
state.th.start()
elif q == 'xexec': # generate build and run (MonetDB Engine)
cfg.backend_type = Backend_Type.BACKEND_MonetDB.value
cxt = xengine.exec(stmts, cxt, keep)
if server_mode == RunType.Threaded:
# assignment to avoid auto gc
# sqls = [s.strip() for s in cxt.sql.split(';')]
qs = [ctypes.c_char_p(bytes(q, 'utf-8')) for q in cxt.queries if len(q)]
sz = len(qs)
payload = (ctypes.c_char_p*sz)(*qs)
def init_prompt() -> PromptState:
aquery_config.init_config()
state = PromptState()
if aquery_config.rebuild_backend:
try:
os.remove(state.server_bin)
except Exception as e:
print(type(e), e)
subprocess.call(['make', "info"])
subprocess.call(['make', state.server_bin], stdout=nullstream)
state.cfg = Config(state.server_mode)
if state.server_mode == RunType.IPC:
atexit.register(lambda: rm(state))
state.init = init_ipc
state.set_ready = lambda : state.mm.seek(0,os.SEEK_SET) or state.mm.write(b'\x01\x01')
def __get_ready():
state.mm.seek(0,os.SEEK_SET)
return state.mm.read(2)[1]
state.get_ready = __get_ready
state.server_status = lambda : state.server.poll() is not None
else:
state.init = init_threaded
rm = lambda: None
def __set_ready():
state.cfg.new_query = 1
state.set_ready = __set_ready
state.get_ready = lambda: aquery_config.run_backend and state.cfg.new_query
if aquery_config.run_backend:
state.server_status = lambda : not state.th.is_alive()
else:
state.server_status = lambda : True
state.init(state)
return state
def save(q:str, cxt: xengine.Context):
savecmd = re.split(r'[ \t]', q)
if len(savecmd) > 1:
fname = savecmd[1]
else:
tm = time.gmtime()
fname = f'{tm.tm_year}{tm.tm_mon}_{tm.tm_mday}_{tm.tm_hour}:{tm.tm_min}:{tm.tm_sec}'
if cxt:
from typing import Optional
def savefile(attr:str, desc:str, ext:Optional[str] = None):
if hasattr(cxt, attr):
attr : str = getattr(cxt, attr)
if attr:
ext = ext if ext else '.' + desc
name = fname if fname.endswith(ext) else fname + ext
with open('saves/' + name, 'wb') as cfile:
cfile.write(attr.encode('utf-8'))
print(f'saved {desc} code as {name}')
savefile('ccode', 'cpp')
savefile('udf', 'udf', '.hpp')
savefile('sql', 'sql')
def main(running = lambda:True, next = input, state = None):
if state is None:
state = init_prompt()
q = ''
payload = None
keep = True
cxt = engine.initialize()
while running():
try:
if state.server_status():
state.init()
while state.get_ready():
time.sleep(.00001)
print("> ", end="")
q = next().lower()
if q == 'exec': # generate build and run (AQuery Engine)
state.cfg.backend_type = Backend_Type.BACKEND_AQuery.value
cxt = engine.exec(state.stmts, cxt, keep)
if subprocess.call(['make', 'snippet'], stdout = nullstream) == 0:
state.set_ready()
continue
elif q == 'xexec': # generate build and run (MonetDB Engine)
state.cfg.backend_type = Backend_Type.BACKEND_MonetDB.value
cxt = xengine.exec(state.stmts, cxt, keep)
if state.server_mode == RunType.Threaded:
# assignment to avoid auto gc
# sqls = [s.strip() for s in cxt.sql.split(';')]
qs = [ctypes.c_char_p(bytes(q, 'utf-8')) for q in cxt.queries if len(q)]
sz = len(qs)
payload = (ctypes.c_char_p*sz)(*qs)
state.payload = payload
try:
state.send(sz, payload)
except TypeError as e:
print(e)
if cxt.udf is not None:
with open('udf.hpp', 'wb') as outfile:
outfile.write(cxt.udf.encode('utf-8'))
if cxt.has_dll:
with open('out.cpp', 'wb') as outfile:
outfile.write((cxt.finalize()).encode('utf-8'))
subprocess.call(['make', 'snippet'], stdout = nullstream)
state.cfg.has_dll = 1
else:
state.cfg.has_dll = 0
state.set_ready()
continue
elif q == 'dbg':
import code
from copy import deepcopy
var = {**globals(), **locals()}
sh = code.InteractiveConsole(var)
try:
send(sz, payload)
except TypeError as e:
sh.interact(banner = 'debugging session began.', exitmsg = 'debugging session ended.')
except BaseException as e:
# don't care about anything happened in interactive console
print(e)
if cxt.udf is not None:
with open('udf.hpp', 'wb') as outfile:
outfile.write(cxt.udf.encode('utf-8'))
if cxt.has_dll:
with open('out.cpp', 'wb') as outfile:
continue
elif q.startswith('log'):
qs = re.split(r'[ \t]', q)
if len(qs) > 1:
cxt.log_level = qs[1]
else:
cxt.print(cxt.log_level)
continue
elif q == 'k':
subprocess.call(state.basecmd)
continue
elif q == 'print':
cxt.print(state.stmts)
continue
elif q.startswith('save'):
save(q, cxt)
continue
elif q == 'keep':
keep = not keep
continue
elif q == 'format' or q == 'fmt':
subprocess.call(['clang-format', 'out.cpp'])
elif q == 'exit':
rm(state)
exit()
elif q == 'r': # build and run
if subprocess.call(['make', 'snippet']) == 0:
state.set_ready()
continue
elif q == 'rr': # run
state.set_ready()
continue
elif q == 'script':
# TODO: script mode
pass
elif q.startswith('save2'):
filename = re.split(r'[ \t]', q)
if (len(filename) > 1):
filename = filename[1]
else:
filename = f'out_{base62uuid(4)}.cpp'
with open(filename, 'wb') as outfile:
outfile.write((cxt.finalize()).encode('utf-8'))
subprocess.call(['make', 'snippet'], stdout = nullstream)
cfg.has_dll = 1
else:
cfg.has_dll = 0
set_ready()
continue
elif q == 'dbg':
import code
from copy import deepcopy
var = {**globals(), **locals()}
sh = code.InteractiveConsole(var)
try:
sh.interact(banner = 'debugging session began.', exitmsg = 'debugging session ended.')
except BaseException as e:
# don't care about anything happened in interactive console
print(e)
continue
elif q.startswith('log'):
qs = re.split(r'[ \t]', q)
if len(qs) > 1:
cxt.log_level = qs[1]
else:
cxt.print(cxt.log_level)
continue
trimed = ws.sub(' ', q.lower()).split(' ')
if trimed[0].startswith('f'):
fn = 'stock.a' if len(trimed) <= 1 or len(trimed[1]) == 0 \
else trimed[1]
with open(fn, 'r') as file:
contents = file.read()#.lower()
state.stmts = parser.parse(contents)
continue
state.stmts = parser.parse(q)
cxt.Info(state.stmts)
except ParseException as e:
print(e)
continue
elif q == 'k':
subprocess.call(basecmd)
continue
elif q == 'print':
cxt.print(stmts)
continue
elif q.startswith('save'):
savecmd = re.split(r'[ \t]', q)
if len(savecmd) > 1:
fname = savecmd[1]
else:
tm = time.gmtime()
fname = f'{tm.tm_year}{tm.tm_mon}_{tm.tm_mday}_{tm.tm_hour}:{tm.tm_min}:{tm.tm_sec}'
if cxt:
from typing import Optional
def savefile(attr:str, desc:str, ext:Optional[str] = None):
if hasattr(cxt, attr):
attr : str = getattr(cxt, attr)
if attr:
ext = ext if ext else '.' + desc
name = fname if fname.endswith(ext) else fname + ext
with open('saves/' + name, 'wb') as cfile:
cfile.write(attr.encode('utf-8'))
print(f'saved {desc} code as {name}')
savefile('ccode', 'cpp')
savefile('udf', 'udf', '.hpp')
savefile('sql', 'sql')
continue
elif q == 'keep':
keep = not keep
continue
elif q == 'format' or q == 'fmt':
subprocess.call(['clang-format', 'out.cpp'])
elif q == 'exit':
except (ValueError, FileNotFoundError) as e:
print(e)
except (KeyboardInterrupt):
break
elif q == 'r': # build and run
if subprocess.call(['make', 'snippet']) == 0:
set_ready()
continue
elif q == 'rr': # run
set_ready()
continue
elif q.startswith('save2'):
filename = re.split(r'[ \t]', q)
if (len(filename) > 1):
filename = filename[1]
else:
filename = f'out_{base62uuid(4)}.cpp'
with open(filename, 'wb') as outfile:
outfile.write((cxt.finalize()).encode('utf-8'))
continue
trimed = ws.sub(' ', q.lower()).split(' ')
if trimed[0].startswith('f'):
fn = 'stock.a' if len(trimed) <= 1 or len(trimed[1]) == 0 \
else trimed[1]
with open(fn, 'r') as file:
contents = file.read()#.lower()
stmts = parser.parse(contents)
continue
stmts = parser.parse(q)
cxt.Info(stmts)
except ParseException as e:
print(e)
continue
except (ValueError, FileNotFoundError) as e:
print(e)
except (KeyboardInterrupt):
break
except:
rm()
raise
rm()
except:
import code, traceback
sh = code.InteractiveConsole({**globals(), **locals()})
sh.interact(banner = traceback.format_exc(), exitmsg = 'debugging session ended.')
save('', cxt)
rm(state)
raise
rm(state)
## FUNCTIONS END
## MAIN
if __name__ == '__main__':
state = None
nextcmd = ''
def check_param(param:List[str], args = False) -> bool:
global nextcmd
for p in param:
if p in sys.argv:
if args:
return True
pos = sys.argv.index(p)
if len(sys.argv) > pos + 1:
nextcmd = sys.argv[pos + 1]
return True
return False
if check_param(['-h', '--help'], True):
print(help_message)
exit()
if len(sys.argv) == 2:
nextcmd = sys.argv[1]
if nextcmd.startswith('-'):
nextcmd = ''
nextcmd = 'test.aquery'
if nextcmd or check_param(['-s', '--script']):
with open(nextcmd) as file:
nextcmd = file.readline()
from engine.utils import _Counter
if file.name.endswith('aquery') or nextcmd.strip() == '#!aquery':
state = init_prompt()
while(nextcmd):
while(not ws.sub('', nextcmd) or nextcmd.strip().startswith('#')):
nextcmd = file.readline()
cnt = _Counter(1)
main(lambda : cnt.inc(-1) > 0, lambda:nextcmd.strip(), state)
nextcmd = file.readline()
if check_param(['-p', '--parse']):
with open(nextcmd, 'r') as file:
contents = file.read()
print(parser.parse(contents))
if check_param(['-m', '--mode', '--run-type']):
nextcmd = nextcmd.lower()
ipc_string = ['ipc', 'proc', '2', 'standalong']
thread_string = ['thread', '1']
if any([s in nextcmd for s in ipc_string]):
server_mode = RunType.IPC
elif any([s in nextcmd for s in thread_string]):
server_mode = RunType.Threaded
main(state=state)

@ -1,10 +1,10 @@
CREATE TABLE test(a INT, b INT, c INT, d INT)
CREATE TABLE testq1(a INT, b INT, c INT, d INT)
LOAD DATA INFILE "test.csv"
INTO TABLE test
INTO TABLE testq1
FIELDS TERMINATED BY ","
SELECT sum(c), b, d
FROM test
FROM testq1
group by a,b,d
order by d DESC, b ASC

@ -2,5 +2,7 @@ mo-future
mo-dots==8.20.21357
mo-parsing
mo-imports
readline; sys_platform != 'win32'
dataclasses; python_version < '3.7'
readline; sys_platform == 'linux'
numpy

@ -1,9 +1,10 @@
#include "../server/libaquery.h"
typedef void (*dealloctor_t) (void*);
extern void* Aalloc(size_t sz);
extern int Afree(void * mem);
extern size_t register_memory(void* ptr, void(dealloc)(void*));
extern void Afree(void * mem);
extern size_t register_memory(void* ptr, dealloctor_t deallocator);
struct Session{
struct Statistic{

@ -5,29 +5,23 @@
#include <unordered_map>
Session* session;
void init_session(){
}
void end_session(){
}
void* Aalloc(size_t sz){
void* Aalloc(size_t sz, deallocator_t deallocator){
void* mem = malloc(sz);
auto memmap = (std::unordered_map<void*>*) session->memory_map;
memmap->insert(mem);
auto memmap = (std::unordered_map<void*, dealloctor_t>*) session->memory_map;
memmap[mem] = deallocator;
return mem;
}
int Afree(void* mem){
auto memmap = (std::unordered_map<void*>*) session->memory_map;
void Afree(void* mem){
auto memmap = (std::unordered_map<void*, dealloctor_t>*) session->memory_map;
memmap[mem](mem);
memmap->erase(mem);
return free(mem);
}
void register_memory(void* ptr, void(dealloc)(void*)){
auto memmap = (std::unordered_map<void*>*) session->memory_map;
memmap->insert(ptr);
void register_memory(void* ptr, deallocator_t deallocator){
auto memmap = (std::unordered_map<void*, dealloctor_t>*) session->memory_map;
memmap[ptr] = deallocator;
}

@ -21,6 +21,15 @@ struct Config{
int buffer_sizes[];
};
struct Session{
struct Statistic{
size_t total_active;
size_t cnt_object;
size_t total_alloc;
};
void* memory_map;
};
struct Context{
typedef int (*printf_type) (const char *format, ...);
std::unordered_map<const char*, void*> tables;
@ -34,6 +43,8 @@ struct Context{
void* alt_server;
Log_level log_level = LOG_INFO;
Session current;
#ifdef THREADING
void* thread_pool;
#endif
@ -49,6 +60,8 @@ struct Context{
if (log_level <= LOG_ERROR)
print(args...);
}
void init_session();
void end_session();
};
#ifdef _WIN32

@ -9,6 +9,7 @@ struct Server{
MYSQL *server = 0;
Context *cxt = 0;
bool status = 0;
bool has_error = false;
char* query = 0;
int type = 0;

@ -1,6 +1,8 @@
#include "libaquery.h"
#include <cstdio>
#include "monetdb_conn.h"
#include "monetdbe.h"
#undef static_assert
const char* monetdbe_type_str[] = {
"monetdbe_bool", "monetdbe_int8_t", "monetdbe_int16_t", "monetdbe_int32_t", "monetdbe_int64_t",
@ -37,6 +39,7 @@ Server::Server(Context* cxt){
}
void Server::connect(Context *cxt){
auto server = static_cast<monetdbe_database*>(this->server);
if (cxt){
cxt->alt_server = this;
this->cxt = cxt;
@ -47,14 +50,14 @@ void Server::connect(Context *cxt){
}
if (server){
printf("Error: Server %llx already connected. Restart? (Y/n). \n", server);
printf("Error: Server %p already connected. Restart? (Y/n). \n", server);
char c[50];
std::cin.getline(c, 49);
for(int i = 0; i < 50; ++i){
if (!c[i] || c[i] == 'y' || c[i] == 'Y'){
monetdbe_close(*server);
free(*server);
server = 0;
this->server = 0;
break;
}
else if(c[i]&&!(c[i] == ' ' || c[i] == '\t'))
@ -64,36 +67,65 @@ void Server::connect(Context *cxt){
server = (monetdbe_database*)malloc(sizeof(monetdbe_database));
auto ret = monetdbe_open(server, nullptr, nullptr);
if (ret == 0){
status = true;
this->server = server;
}
else{
if(server)
free(server);
this->server = 0;
status = false;
puts(ret == -1 ? "Allocation Error." : "Internal Database Error.");
}
}
void Server::exec(const char* q){
auto qresult = monetdbe_query(*server, const_cast<char*>(q), &res, &cnt);
if (res != 0)
this->cnt = res->nrows;
auto server = static_cast<monetdbe_database*>(this->server);
auto _res = static_cast<monetdbe_result*>(this->res);
monetdbe_cnt _cnt = 0;
auto qresult = monetdbe_query(*server, const_cast<char*>(q), &_res, &_cnt);
if (_res != 0){
this->cnt = _res->nrows;
this->res = _res;
}
if (qresult != nullptr){
printf("Execution Failed. %s\n", qresult);
last_error = qresult;
}
}
bool Server::haserror(){
if (last_error){
last_error = 0;
return true;
}
else{
return false;
}
}
void Server::close(){
if(this->server){
monetdbe_close(*(this->server));
free(this->server);
auto server = static_cast<monetdbe_database*>(this->server);
monetdbe_close(*(server));
free(server);
this->server = 0;
}
}
void* Server::getCol(int col_idx){
if(res){
res->ncols;
auto err_msg = monetdbe_result_fetch(res, &ret_col, col_idx);
auto _res = static_cast<monetdbe_result*>(this->res);
auto err_msg = monetdbe_result_fetch(_res,
reinterpret_cast<monetdbe_column**>(&ret_col), col_idx);
if(err_msg == nullptr)
{
cnt = ret_col->count;
auto _ret_col = static_cast<monetdbe_column*>(this->ret_col);
cnt = _ret_col->count;
printf("Dbg: Getting col %s, type: %s\n",
ret_col->name, monetdbe_type_str[ret_col->type]);
return ret_col->data;
_ret_col->name, monetdbe_type_str[_ret_col->type]);
return _ret_col->data;
}
else{
printf("Error fetching result: %s\n", err_msg);

@ -1,17 +1,16 @@
#include "monetdbe.h"
struct Context;
struct Server{
monetdbe_database *server = 0;
void *server = 0;
Context *cxt = 0;
bool status = 0;
char* query = 0;
int type = 1;
monetdbe_result* res = 0;
monetdbe_column* ret_col = 0;
monetdbe_cnt cnt = 0;
void* res = 0;
void* ret_col = 0;
long long cnt = 0;
char* last_error = 0;
Server(Context* cxt = nullptr);
@ -19,5 +18,6 @@ struct Server{
void exec(const char* q);
void *getCol(int col_idx);
void close();
bool haserror();
~Server();
};

@ -72,6 +72,12 @@ __AQEXPORT__(bool) have_hge(){
#endif
}
void Context::init_session(){
if (log_level == LOG_INFO){
Session::Statistic stats;
}
}
int dll_main(int argc, char** argv, Context* cxt){
Config *cfg = reinterpret_cast<Config *>(argv[0]);
@ -99,15 +105,20 @@ int dll_main(int argc, char** argv, Context* cxt){
}
for(int i = 0; i < n_recv; ++i)
{
printf("%s, %d\n", n_recvd[i], n_recvd[i][0] == 'Q');
if (n_recvd[i][0] == 'Q'){
server->exec(n_recvd[i] + 1);
printf("Exec Q%d: %s\n", i, n_recvd[i]);
}
else if (n_recvd[i][0] == 'P' && handle) {
else if (n_recvd[i][0] == 'P' && handle && !server->haserror()) {
code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, n_recvd[i]+1));
c(cxt);
}
}
if(handle) {
dlclose(handle);
handle = 0;
}
n_recv = 0;
}
if(server->last_error == nullptr){
@ -166,11 +177,11 @@ extern "C" int __DLLEXPORT__ main(int argc, char** argv) {
cxt->log("running: %s\n", running? "true":"false");
cxt->log("ready: %s\n", ready? "true":"false");
void* handle = dlopen("./dll.so", RTLD_LAZY);
cxt->log("handle: %lx\n", handle);
cxt->log("handle: %p\n", handle);
if (handle) {
cxt->log("inner\n");
code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, "dllmain"));
cxt->log("routine: %lx\n", c);
cxt->log("routine: %p\n", c);
if (c) {
cxt->log("inner\n");
cxt->err("return: %d\n", c(cxt));
@ -195,11 +206,24 @@ int test_main()
cxt->alt_server = new Server(cxt);
Server* server = reinterpret_cast<Server*>(cxt->alt_server);
const char* qs[]= {
"CREATE TABLE tt(a INT, b INT, c INT, d INT);",
"COPY OFFSET 2 INTO tt FROM 'D:/gg/AQuery++/test.csv' ON SERVER USING DELIMITERS ',';",
"CREATE TABLE sale(Mont INT, sales INT);",
"COPY OFFSET 2 INTO sale FROM 'D:/gg/AQuery++/moving_avg.csv' ON SERVER USING DELIMITERS ',';",
"SELECT a FROM tt, sale WHERE a = Mont ;"
"CREATE TABLE stocks(timestamp INT, price INT);",
"INSERT INTO stocks VALUES(1, 15);;",
"INSERT INTO stocks VALUES(2,19); ",
"INSERT INTO stocks VALUES(3,16);",
"INSERT INTO stocks VALUES(4,17);",
"INSERT INTO stocks VALUES(5,15);",
"INSERT INTO stocks VALUES(6,13);",
"INSERT INTO stocks VALUES(7,5);",
"INSERT INTO stocks VALUES(8,8);",
"INSERT INTO stocks VALUES(9,7);",
"INSERT INTO stocks VALUES(10,13);",
"INSERT INTO stocks VALUES(11,11);",
"INSERT INTO stocks VALUES(12,14);",
"INSERT INTO stocks VALUES(13,10);",
"INSERT INTO stocks VALUES(14,5);",
"INSERT INTO stocks VALUES(15,2);",
"INSERT INTO stocks VALUES(16,5);",
"SELECT price, timestamp FROM stocks WHERE (((price - timestamp) > 1) AND (NOT ((price * timestamp) < 100))) ;",
};
n_recv = sizeof(qs)/(sizeof (char*));
n_recvd = const_cast<char**>(qs);
@ -215,11 +239,11 @@ int test_main()
cxt->log_level = LOG_INFO;
puts(cpp_17 ?"true":"false");
void* handle = dlopen("./dll.so", RTLD_LAZY);
cxt->log("handle: %llx\n", handle);
cxt->log("handle: %p\n", handle);
if (handle) {
cxt->log("inner\n");
code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, "dllmain"));
cxt->log("routine: %llx\n", c);
code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, "dll_6EgnKh"));
cxt->log("routine: %p\n", c);
if (c) {
cxt->log("inner\n");
cxt->log("return: %d\n", c(cxt));

@ -196,7 +196,7 @@
<MultiProcessorCompilation>true</MultiProcessorCompilation>
<EnableParallelCodeGeneration>true</EnableParallelCodeGeneration>
<LanguageStandard>stdcpplatest</LanguageStandard>
<LanguageStandard_C>stdc17</LanguageStandard_C>
<LanguageStandard_C>stdc11</LanguageStandard_C>
<AdditionalIncludeDirectories>$(ProjectDir)\..\monetdb\msvc</AdditionalIncludeDirectories>
</ClCompile>
<Link>

@ -1,3 +1,5 @@
#pragma once
#include "../threading.h"
#include <thread>
#include <cstdio>
@ -5,8 +7,8 @@
using namespace std;
FILE *fp;
int testing_throughput(uint32_t n_jobs){
printf("Threadpool througput test with %u jobs.\n", n_jobs);
long long testing_throughput(uint32_t n_jobs, bool prompt = true){
printf("Threadpool througput test with %u jobs. Press any key to start.\n", n_jobs);
auto tp = ThreadPool(thread::hardware_concurrency());
getchar();
@ -19,22 +21,26 @@ int testing_throughput(uint32_t n_jobs){
auto t = (chrono::high_resolution_clock::now() - time).count();
printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", i, t, i/(double)(t));
//this_thread::sleep_for(2s);
return 0;
fclose(fp);
return t;
}
int testing_transaction(uint32_t n_burst, uint32_t n_batch,
uint32_t base_time, uint32_t var_time){
long long testing_transaction(uint32_t n_burst, uint32_t n_batch,
uint32_t base_time, uint32_t var_time, bool prompt = true, FILE* _fp = stdout){
printf("Threadpool transaction test: burst: %u, batch: %u, time: [%u, %u].\n"
, n_burst, n_batch, base_time, var_time + base_time);
if (prompt) {
puts("Press any key to start.");
getchar();
}
auto tp = ThreadPool(thread::hardware_concurrency());
getchar();
fp = _fp;
auto i = 0u, j = 0u;
auto time = chrono::high_resolution_clock::now();
while(j++ < n_batch){
i = 0u;
while(i++ < n_burst)
tp.enqueue_task({ [](void* f) { printf( "%d ", *(int*)f); free(f); }, new int(i) });
tp.enqueue_task({ [](void* f) { fprintf(fp, "%d ", *(int*)f); free(f); }, new int(j) });
fflush(stdout);
this_thread::sleep_for(chrono::microseconds(rand()%var_time + base_time));
}
@ -42,6 +48,27 @@ int testing_transaction(uint32_t n_burst, uint32_t n_batch,
while (tp.busy()) this_thread::sleep_for(1s);
auto t = (chrono::high_resolution_clock::now() - time).count();
printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", j*i, t, j*i/(double)(t));
return 0;
return t;
}
long long testing_destruction(bool prompt = true){
fp = fopen("tmp.tmp", "w");
if (prompt) {
puts("Press any key to start.");
getchar();
}
auto time = chrono::high_resolution_clock::now();
for(int i = 0; i < 8; ++i)
testing_transaction(0xfff, 0xff, 400, 100, false, fp);
for(int i = 0; i < 64; ++i)
testing_transaction(0xff, 0xf, 60, 20, false, fp);
for(int i = 0; i < 1024; ++i) {
auto tp = new ThreadPool(256);
delete tp;
}
return 0;
auto t = (chrono::high_resolution_clock::now() - time).count();
fclose(fp);
return t;
}

@ -18,9 +18,9 @@ void ThreadPool::daemon_proc(uint32_t id){
for(; tf[id]; this_thread::sleep_for(*ticking? 0ns:100ms)) {
if (A_TP_HAVE_PAYLOAD(tf[id])) {
A_TP_SET_PAYLOAD(tf[id]);
// A_TP_SET_PAYLOAD(tf[id]);
current_payload[id]();
current_payload[id].empty();
//current_payload[id].empty();
A_TP_UNSET_PAYLOAD(tf[id]);
}
}
@ -32,31 +32,46 @@ void ThreadPool::tick(){
auto tf = static_cast<atomic<uint8_t>*>(this->thread_flags);
auto ticking = static_cast<atomic<bool>*>(this->ticking);
auto th = static_cast<thread*>(this->thread_handles);
for(; !this->terminate; this_thread::sleep_for(50ms)){
auto n_threads = (uint8_t) this->n_threads;
for(; !this->terminate; this_thread::sleep_for(5ms)){
if(*ticking) {
bool quit = false;
for(; !quit; ){
for(uint32_t i = 0; i < n_threads; ++i){
if(!A_TP_HAVE_PAYLOAD(tf[i])){
pq_lock->lock();
payload_t& p = pq->front();
current_payload[i] = p;
A_TP_SET_PAYLOAD(tf[i]);
pq->pop_front();
quit = !pq->size();
pq_lock->unlock();
if (quit) break;
size_t sz = pq->size();
while(!pq->empty()){
while(!pq->empty() && sz < (n_threads<<10)){
for(uint8_t i = 0; i < n_threads; ++i){
if(!A_TP_HAVE_PAYLOAD(tf[i])){
pq_lock->lock();
current_payload[i] = pq->front();
A_TP_SET_PAYLOAD(tf[i]);
pq->pop_front();
sz = pq->size();
pq_lock->unlock();
if (sz>=(n_threads<<10) || sz == 0) break;
}
}
}
pq_lock->lock();
while(!pq->empty()){
for(uint8_t i = 0; i < n_threads; ++i){
if(!A_TP_HAVE_PAYLOAD(tf[i])){
current_payload[i] = pq->front();
A_TP_SET_PAYLOAD(tf[i]);
pq->pop_front();
if(pq->empty()) break;
}
}
}
pq_lock->unlock();
}
puts("done");
// puts("done");
*ticking = false;
}
}
for (uint32_t i = 0; i < n_threads; ++i)
for (uint8_t i = 0; i < n_threads; ++i)
tf[i] &= 0xfd;
for (uint32_t i = 0; i < n_threads; ++i)
for (uint8_t i = 0; i < n_threads; ++i)
th[i].join();
delete[] th;
@ -70,7 +85,11 @@ void ThreadPool::tick(){
ThreadPool::ThreadPool(uint32_t n_threads)
: n_threads(n_threads) {
printf("Thread pool started with %u threads;", n_threads);
if (n_threads <= 0){
n_threads = thread::hardware_concurrency();
this->n_threads = n_threads;
}
printf("Thread pool started with %u threads;\n", n_threads);
fflush(stdout);
this->terminate = false;
payload_queue = new deque<payload_t>;
@ -81,14 +100,15 @@ ThreadPool::ThreadPool(uint32_t n_threads)
thread_flags = tf;
ticking = static_cast<void*>(new atomic<bool>(false));
payload_queue_lock = new mutex();
tick_handle = new thread(&ThreadPool::tick, this);
current_payload = new payload_t[n_threads];
for (uint32_t i = 0; i < n_threads; ++i){
atomic_init(tf + i, 0b10);
th[i] = thread(&ThreadPool::daemon_proc, this, i);
}
payload_queue_lock = new mutex();
tick_handle = new thread(&ThreadPool::tick, this);
current_payload = new payload_t[n_threads];
}
@ -131,4 +151,3 @@ bool ThreadPool::busy(){
}
return true;
}

@ -0,0 +1,167 @@
#include "threading.h"
#include <thread>
#include <atomic>
#include <mutex>
#include <deque>
using namespace std;
using namespace chrono_literals;
#define A_TP_HAVE_PAYLOAD(x) ((x) & 0b1)
#define A_TP_SET_PAYLOAD(x) ((x) |= 0b1)
#define A_TP_UNSET_PAYLOAD(x) ((x) &= 0xfe)
#define A_TP_IS_RUNNING(x) ((x) & 0b10)
void ThreadPool::daemon_proc(uint32_t id){
decltype(auto) tfid = static_cast<atomic<uint8_t>*>(this->thread_flags)[id];
auto ticking = static_cast<atomic<uint16_t>*>(this->ticking);
auto& currentpayload = this->current_payload[id];
auto pq_lock = static_cast<mutex*>(payload_queue_lock);
auto pq = static_cast<deque<payload_t> *>(payload_queue);
bool idle = true;
uint32_t timer = 0;
for(; tfid; ) {
if (A_TP_HAVE_PAYLOAD(tfid)) {
//A_TP_SET_PAYLOAD(tfid);
currentpayload();
// currentpayload.empty();
A_TP_UNSET_PAYLOAD(tfid);
if (idle) {
idle = false;
*ticking -= 1;
timer = 1;
}
}
else if (!pq->empty()) {
pq_lock->lock();
if (!pq->empty()) {
pq->front()();
pq->pop_front();
}
pq_lock->unlock();
if (idle) {
idle = false;
*ticking -= 1;
timer = 1;
}
}
else if (!idle) {
idle = true;
*ticking += 1;
timer = 1;
}
else if (*ticking == n_threads ) {
if (timer > 1000000u)
this_thread::sleep_for(chrono::nanoseconds(timer/100u));
timer = timer > 4200000000u ? 4200000000u : timer*1.0000001 + 1;
}
}
}
void ThreadPool::tick(){
auto pq_lock = static_cast<mutex*>(payload_queue_lock);
auto pq = static_cast<deque<payload_t> *>(payload_queue);
auto tf = static_cast<atomic<uint8_t>*>(this->thread_flags);
auto ticking = static_cast<atomic<uint16_t>*>(this->ticking);
auto th = static_cast<thread*>(this->thread_handles);
//uint32_t threshold = (n_threads << 10u);
for(; !this->terminate; this_thread::sleep_for(50ms)){
// if(*ticking) {
// while(pq->size() > 0)
// {
// bool pqsize = false;
// for(; !pqsize; ){
// for(uint32_t i = 0; i < n_threads; ++i){
// if(!A_TP_HAVE_PAYLOAD(tf[i])){
// pq_lock->lock();
// current_payload[i] = pq->front();
// A_TP_SET_PAYLOAD(tf[i]);
// pq->pop_front();
// pqsize =! pq->size();
// pq_lock->unlock();
// if (pqsize) break;
// }
// }
// }
// }
// // puts("done");
// *ticking = false;
// }
}
for (uint32_t i = 0; i < n_threads; ++i)
tf[i] &= 0xfd;
for (uint32_t i = 0; i < n_threads; ++i)
th[i].join();
delete[] th;
delete[] tf;
delete pq;
delete pq_lock;
delete ticking;
auto cp = static_cast<payload_t*>(current_payload);
delete[] cp;
}
ThreadPool::ThreadPool(uint32_t n_threads)
: n_threads(n_threads) {
printf("Thread pool started with %u threads;\n", n_threads);
fflush(stdout);
this->terminate = false;
payload_queue = new deque<payload_t>;
auto th = new thread[n_threads];
auto tf = new atomic<uint8_t>[n_threads];
thread_handles = th;
thread_flags = tf;
ticking = static_cast<void*>(new atomic<uint16_t>(n_threads));
payload_queue_lock = new mutex();
tick_handle = new thread(&ThreadPool::tick, this);
current_payload = new payload_t[n_threads];
for (uint32_t i = 0; i < n_threads; ++i){
atomic_init(tf + i, 0b10);
th[i] = thread(&ThreadPool::daemon_proc, this, i);
}
}
void ThreadPool::enqueue_task(const payload_t& payload){
auto pq_lock = static_cast<mutex*>(payload_queue_lock);
auto pq = static_cast<deque<payload_t> *>(payload_queue);
auto tf = static_cast<atomic<uint8_t>*>(this->thread_flags);
auto& ticking = *static_cast<atomic<uint16_t>*>(this->ticking);
if (ticking > 0){
for (uint32_t i = 0; i < n_threads; ++i){
if(!A_TP_HAVE_PAYLOAD(tf[i])){
current_payload[i] = payload;
A_TP_SET_PAYLOAD(tf[i]);
return;
}
}
}
pq_lock->lock();
pq->push_back(payload);
pq_lock->unlock();
}
ThreadPool::~ThreadPool() {
this->terminate = true;
auto tick = static_cast<thread*> (tick_handle);
tick->join();
delete tick;
puts("Thread pool terminated.");
}
bool ThreadPool::busy(){
return !(*(atomic<int16_t>*)ticking == n_threads);
}

@ -49,7 +49,7 @@ namespace types {
#define ULL_Type __uint128_t
#define LL_Type __int128_t
#else
#define F_INT128
#define F_INT128(__F_)
#define ULL_Type unsigned long long
#define LL_Type long long
#endif

@ -8,7 +8,7 @@ string base62uuid(int l = 8) {
static mt19937_64 engine(chrono::system_clock::now().time_since_epoch().count());
static uniform_int_distribution<uint64_t> u(0x10000, 0xfffff);
uint64_t uuid = (u(engine) << 32ull) + (chrono::system_clock::now().time_since_epoch().count() & 0xffffffff);
printf("%llx\n", uuid);
printf("%p\n", uuid);
string ret;
while (uuid && l-- >= 0) {
ret = string("") + base62alp[uuid % 62] + ret;

@ -0,0 +1,14 @@
#!aquery
f stock.a
xexec
f moving_avg.a
xexec
f q1.sql
xexec
f joins.a
xexec
f udf3.a
xexec
exit

@ -1,19 +0,0 @@
#pragma once
#include "./server/libaquery.h"
#include "./server/aggregations.h"
auto covariances = [](auto x, auto y, auto w) {
static auto xmeans=0.0;static auto ymeans=0.0; static auto cnt=0;
auto reset = [=]() { xmeans=0.0, ymeans=0.0, cnt=0; };
auto call = [](decltype(x) x, decltype(y) y, decltype(w) w){
if((cnt < w)) {
xmeans += x;
ymeans += y;
cnt += 1;
}
y = (x - xmeans);
return avg(((x.subvec((x - w), x) - xmeans) * (y.subvec((y - w), y) - ymeans)));
};
return std::make_pair(reset, call);
};
Loading…
Cancel
Save