From eaae297b4934322338d426b1f78af777e403e7ce Mon Sep 17 00:00:00 2001 From: Bill Date: Tue, 23 Aug 2022 21:36:13 +0800 Subject: [PATCH] WIP: sdk, bug fixes, os support regression fix --- .gitignore | 1 + aquery_config.py | 74 +++-- build_instructions.md | 3 +- engine/str.sql | 1 + engine/utils.py | 20 +- out_attempt1.cpp | 60 ---- prompt.py | 620 ++++++++++++++++++++--------------- q1.sql | 6 +- requirements.txt | 4 +- sdk/aquery.h | 5 +- sdk/aquery_mem.cpp | 24 +- server/libaquery.h | 15 +- server/mariadb_conn.h | 1 + server/monetdb_conn.cpp | 56 +++- server/monetdb_conn.h | 10 +- server/server.cpp | 46 ++- server/server.vcxproj | 2 +- server/table.cpp | 2 +- server/tests/thread_pool.hpp | 45 ++- server/threading.cpp | 65 ++-- server/threading2.cpp | 167 ++++++++++ server/types.h | 2 +- server/utils.cpp | 2 +- test.aquery | 14 + udf_style1.hpp | 19 -- 25 files changed, 811 insertions(+), 453 deletions(-) create mode 100644 engine/str.sql delete mode 100644 out_attempt1.cpp create mode 100644 server/threading2.cpp create mode 100644 test.aquery delete mode 100644 udf_style1.hpp diff --git a/.gitignore b/.gitignore index 01c62b9..f606d26 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +a.out.* *.log *.pyc *.tab diff --git a/aquery_config.py b/aquery_config.py index 26a9f51..7a29c59 100644 --- a/aquery_config.py +++ b/aquery_config.py @@ -1,37 +1,59 @@ # put environment specific configuration here -import os -# os.environ['CXX'] = 'C:/Program Files/LLVM/bin/clang.exe' -os.environ['THREADING'] = '0' - +## GLOBAL CONFIGURATION FLAGS add_path_to_ldpath = True rebuild_backend = True run_backend = True have_hge = False - os_platform = 'unkown' +cygroot = 'c:/msys64/usr/bin' +msbuildroot = 'd:/gg/vs22/MSBuild/Current/Bin' +__config_initialized__ = False + +class Build_Config: + def __init__(self) -> None: + self.OptimizationLv = '4' # [O0, O1, O2, O3, Ofast] + self.Platform = 'x64' + self.fLTO = True + self.fmarchnative = True + + def configure(self): + pass -import sys -if os.name == 'nt': - if sys.platform == 'win32': - os_platform = 'win' - elif sys.platform == 'cygwin' or sys.platform == 'msys': - os_platform = 'cygwin' -elif os.name == 'posix': - if sys.platform == 'darwin': - os_platform = 'mac' - elif 'linux' in sys.platform: - os_platform = 'linux' - elif 'bsd' in sys.platform: - os_platform = 'bsd' +def init_config(): + global __config_initialized__, os_platform +## SETUP ENVIRONMENT VARIABLES + import os + from engine.utils import add_dll_dir + # os.environ['CXX'] = 'C:/Program Files/LLVM/bin/clang.exe' + os.environ['THREADING'] = '1' + if not __config_initialized__: + import sys + if os.name == 'nt': + if sys.platform == 'win32': + os_platform = 'win' + elif sys.platform == 'cygwin' or sys.platform == 'msys': + os_platform = 'cygwin' + elif os.name == 'posix': + if sys.platform == 'darwin': + os_platform = 'mac' + elif 'linux' in sys.platform: + os_platform = 'linux' + elif 'bsd' in sys.platform: + os_platform = 'bsd' + elif sys.platform == 'cygwin' or sys.platform == 'msys': + os_platform = 'cygwin' + # deal with msys dependencies: + if os_platform == 'win': -# deal with msys dependencies: -if os_platform == 'win': - os.add_dll_directory('c:/msys64/usr/bin') - os.add_dll_directory(os.path.abspath('./msc-plugin')) - print("adding path") -else: - import readline - \ No newline at end of file + add_dll_dir(cygroot) + add_dll_dir(os.path.abspath('./msc-plugin')) + # print("adding path") + else: + import readline + if os_platform == 'cygwin': + add_dll_dir('./lib') + __config_initialized__ = True + \ No newline at end of file diff --git a/build_instructions.md b/build_instructions.md index 6d06dd6..d5d8297 100644 --- a/build_instructions.md +++ b/build_instructions.md @@ -8,8 +8,7 @@ - os.add_dll_directory(os.path.abspath('./monetdb/msvc')) - gcc-mingw (link w/ mingw monetdb, can only load under mingw python): - $(CXX) server/server.cpp server/monetdb_conn.cpp -fPIC -shared $(OS_SUPPORT) --std=c++1z -O3 -march=native -o server.so -I./monetdb/msys64 -L./lib -lmonetdbe - - os.add_dll_directory('c:/msys64/usr/bin') - - os.add_dll_directory(os.path.abspath('./lib')) + - add_dll_dir(os.path.abspath('./lib')) - msvc: - D:\gg\vs22\MSBuild\Current\Bin\msbuild "d:\gg\AQuery++\server\server.vcxproj" /p:configuration=Release /p:platform=x64 - os.add_dll_directory(os.path.abspath('./monetdb/msvc')) diff --git a/engine/str.sql b/engine/str.sql new file mode 100644 index 0000000..96b97c2 --- /dev/null +++ b/engine/str.sql @@ -0,0 +1 @@ +create table strtest(a: string, b: string); diff --git a/engine/utils.py b/engine/utils.py index 20ede54..2337765 100644 --- a/engine/utils.py +++ b/engine/utils.py @@ -70,4 +70,22 @@ def remove_last(pattern : str, string : str, escape : Set[str] = set()) -> str: return string else: return string[:idx] + string[idx+1:] - \ No newline at end of file + +class _Counter: + def __init__(self, cnt): + self.cnt = cnt + def inc(self, cnt = 1): + self.cnt += cnt + cnt = self.cnt - cnt + return cnt + +import re +ws = re.compile(r'\s+') + +def add_dll_dir(dll: str): + import sys, os + if sys.version_info.major >= 3 and sys.version_info.minor >7 and os.name == 'nt': + os.add_dll_directory(dll) + else: + os.environ['PATH'] = os.path.abspath(dll) + os.pathsep + os.environ['PATH'] + \ No newline at end of file diff --git a/out_attempt1.cpp b/out_attempt1.cpp deleted file mode 100644 index de6ffb3..0000000 --- a/out_attempt1.cpp +++ /dev/null @@ -1,60 +0,0 @@ -#include "./server/libaquery.h" -#include -#include "./server/hasher.h" -#include "csv.h" -#include "./server/aggregations.h" - - extern "C" int __DLLEXPORT__ dllmain(Context* cxt) { - using namespace std; - using namespace types; - - auto sale = new TableInfo("sale", 2); -cxt->tables.insert({"sale", sale}); -auto& sale_Month = *(ColRef *)(&sale->colrefs[0]); -auto& sale_sales = *(ColRef *)(&sale->colrefs[1]); -sale_Month.init(); -sale_sales.init(); -io::CSVReader<2> csv_reader_53ychC("moving_avg.csv"); -csv_reader_53ychC.read_header(io::ignore_extra_column, "Month","sales"); -int tmp_7ttMnHd3; -int tmp_5nHjeAtP; -while(csv_reader_53ychC.read_row(tmp_7ttMnHd3,tmp_5nHjeAtP)) { - -sale_Month.emplace_back(tmp_7ttMnHd3); -sale_sales.emplace_back(tmp_5nHjeAtP); -} -auto out_3Xio = new TableInfo,decays>("out_3Xio", 2); -cxt->tables.insert({"out_3Xio", out_3Xio}); -auto& out_3Xio_Month = *(ColRef> *)(&out_3Xio->colrefs[0]); -auto& out_3Xio_avgsw3salesales = *(ColRef> *)(&out_3Xio->colrefs[1]); -out_3Xio_Month.init(); -out_3Xio_Month = sale_Month; -out_3Xio_avgsw3salesales.init(); -out_3Xio_avgsw3salesales = avgw(3,sale_sales); -// print(*out_3Xio); -FILE* fp_4nKGhD = fopen("moving_avg_output.csv", "w"); -out_3Xio->printall(",", "\n", nullptr, fp_4nKGhD); -fclose(fp_4nKGhD); -typedef record record_type1H2vDGL; -unordered_map, transTypes> g6Mjxfk5; -for (uint32_t i7u = 0; i7u < sale_sales.size; ++i7u){ -g6Mjxfk5[forward_as_tuple(sale_sales[i7u])].emplace_back(i7u); -} -auto out_2IU2 = new TableInfo,decays>("out_2IU2", 2); -cxt->tables.insert({"out_2IU2", out_2IU2}); -auto& out_2IU2_sales = *(ColRef> *)(&out_2IU2->colrefs[0]); -auto& out_2IU2_minsw2saleMonth = *(ColRef> *)(&out_2IU2->colrefs[1]); -out_2IU2_sales.init(); -out_2IU2_minsw2saleMonth.init(); -for(auto& i5J : g6Mjxfk5) { -auto &key_4jl5toH = i5J.first; -auto &val_VJGwVwH = i5J.second; -out_2IU2_sales.emplace_back(get<0>(key_4jl5toH)); -out_2IU2_minsw2saleMonth.emplace_back(minw(2,sale_Month[val_VJGwVwH])); -} -// print(*out_2IU2); -FILE* fp_18R4fY = fopen("flatten.csv", "w"); -out_2IU2->printall(",","\n", nullptr, fp_18R4fY); -fclose(fp_18R4fY); -return 0; -} \ No newline at end of file diff --git a/prompt.py b/prompt.py index 83436cd..348b8bb 100644 --- a/prompt.py +++ b/prompt.py @@ -1,26 +1,56 @@ +import os + +from engine.ast import Context +if __name__ == '__main__': + import mimetypes + mimetypes._winreg = None + +from dataclasses import dataclass import enum -import re +from tabnanny import check import time # import dbconn +import re +from typing import Callable, List, Optional from mo_parsing import ParseException import aquery_parser as parser import engine import engine.projection import engine.ddl - import reconstruct as xengine import subprocess import mmap import sys -import os from engine.utils import base62uuid import atexit - import threading import ctypes - import aquery_config +import numpy as np +from engine.utils import ws +from engine.utils import add_dll_dir + +## GLOBALS BEGIN +nullstream = open(os.devnull, 'w') +help_message = '''\ +Run prompt.py without supplying with any arguments to run in interactive mode. + +--help, -h + print out this help message +--version, -v + returns current version of AQuery +--mode, -m Optional[threaded|IPC] + execution engine run mode: + threaded or 1 (default): run the execution engine and compiler/prompt in separate threads + IPC, standalong or 2: run the execution engine in a new process which uses shared memory to communicate with the compiler process +--script, -s [SCRIPT_FILE] + script mode: run the aquery script file +--parse-only, -p + parse only: parse the file and print out the AST +''' +## GLOBALS END +## CLASSES BEGIN class RunType(enum.Enum): Threaded = 0 IPC = 1 @@ -30,75 +60,19 @@ class Backend_Type(enum.Enum): BACKEND_MonetDB = 1 BACKEND_MariaDB = 2 -server_mode = RunType.Threaded - -server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so' - - - -nullstream = open(os.devnull, 'w') - -if aquery_config.rebuild_backend: - try: - os.remove(server_bin) - except Exception as e: - print(type(e), e) - subprocess.call(['make', "info"]) - subprocess.call(['make', server_bin], stdout=nullstream) - -cleanup = True - -def rm(): - global cleanup - if cleanup: - mm.seek(0,os.SEEK_SET) - mm.write(b'\x00\x00') - mm.flush() - - try: - time.sleep(.001) - server.kill() - time.sleep(.001) - server.terminate() - except OSError: - pass - - files = os.listdir('.') - for f in files: - if f.endswith('.shm'): - os.remove(f) - mm.close() - cleanup = False - nullstream.close() - -def init_ipc(): - global shm, server, basecmd, mm - shm = base62uuid() - if sys.platform != 'win32': - shm += '.shm' - basecmd = ['bash', '-c', 'rlwrap k'] - mm = None - if not os.path.isfile(shm): - # create initial file - with open(shm, "w+b") as handle: - handle.write(b'\x01\x00') # [running, new job] - handle.flush() - mm = mmap.mmap(handle.fileno(), 2, access=mmap.ACCESS_WRITE, offset=0) - if mm is None: - exit(1) - else: - basecmd = ['bash.exe', '-c', 'rlwrap ./k'] - mm = mmap.mmap(0, 2, shm) - mm.write(b'\x01\x00') - mm.flush() - server = subprocess.Popen(["./server.bin", shm]) - -import numpy as np - -c = lambda _ba: ctypes.cast((ctypes.c_char * len(_ba)).from_buffer(_ba), ctypes.c_char_p) - class Config: - def __init__(self, nq = 0, mode = server_mode, n_bufs = 0, bf_szs = []) -> None: + __all_attrs__ = ['running', 'new_query', 'server_mode', 'backend_type', 'has_dll', 'n_buffers'] + __init_attributes__ = False + def __init_self__(): + if not Config.__init_attributes__: + from functools import partial + for _i, attr in enumerate(Config.__all_attrs__): + if not hasattr(Config, attr): + setattr(Config, attr, property(partial(Config.getter, i = _i), partial(Config.setter, i = _i))) + __init_attributes__ = True + + def __init__(self, mode, nq = 0, n_bufs = 0, bf_szs = []) -> None: + Config.__init_self__() self.int_size = 4 self.n_attrib = 6 self.buf = bytearray((self.n_attrib + n_bufs) * self.int_size) @@ -121,29 +95,83 @@ class Config: @property def c(self): - return c(self.buf) - -def binder(cls, attr, _i): - from functools import partial - setattr(cls, attr, property(partial(cls.getter, i = _i), partial(cls.setter, i = _i))) + return ctypes.cast(\ + (ctypes.c_char * len(self.buf)).from_buffer(self.buf), ctypes.c_char_p) -binder(Config, 'running', 0) -binder(Config, 'new_query', 1) -binder(Config, 'server_mode', 2) -binder(Config, 'backend_type', 3) -binder(Config, 'has_dll', 4) -binder(Config, 'n_buffers', 5) +@dataclass +class PromptState(): + cleanup = True + th = None + send = None + test_parser = True + server_mode : RunType = RunType.Threaded + server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so' + set_ready = lambda: None + get_ready = lambda: None + server_status = lambda: False + cfg : Config = None + shm : str = '' + server : subprocess.Popen = None + basecmd : List[str] = None + mm : mmap = None + th : threading.Thread + send : Callable = lambda *_:None + init : Callable[['PromptState'], None] = lambda _:None + stmts = [''] + payloads = {} +## CLASSES END -cfg = Config() -th = None -send = None +## FUNCTIONS BEGIN +def rm(state:PromptState): + if state.cleanup: + state.mm.seek(0,os.SEEK_SET) + state.mm.write(b'\x00\x00') + state.mm.flush() + + try: + time.sleep(.001) + state.server.kill() + time.sleep(.001) + state.server.terminate() + except OSError: + pass -def init_threaded(): + files = os.listdir('.') + for f in files: + if f.endswith('.shm'): + os.remove(f) + state.mm.close() + state.cleanup = False + nullstream.close() + +def init_ipc(state: PromptState): + state.shm = base62uuid() + if sys.platform != 'win32': + state.shm += '.shm' + state.basecmd = ['bash', '-c', 'rlwrap k'] + state.mm = None + if not os.path.isfile(shm): + # create initial file + with open(shm, "w+b") as handle: + handle.write(b'\x01\x00') # [running, new job] + handle.flush() + state.mm = mmap.mmap(handle.fileno(), 2, access=mmap.ACCESS_WRITE, offset=0) + if state.mm is None: + exit(1) + else: + state.basecmd = ['bash.exe', '-c', 'rlwrap ./k'] + state.mm = mmap.mmap(0, 2, state.shm) + state.mm.write(b'\x01\x00') + state.mm.flush() + state.server = subprocess.Popen(["./server.bin", state.shm]) + +def init_threaded(state : PromptState): + state.cleanup = False if os.name == 'nt' and aquery_config.add_path_to_ldpath: t = os.environ['PATH'].lower().split(';') vars = re.compile('%.*%') - os.add_dll_directory(os.path.abspath('.')) - os.add_dll_directory(os.path.abspath('./lib')) + add_dll_dir(os.path.abspath('.')) + add_dll_dir(os.path.abspath('./lib')) for e in t: if(len(e) != 0): if '%' in e: @@ -154,7 +182,7 @@ def init_threaded(): except Exception: continue try: - os.add_dll_directory(e) + add_dll_dir(e) except Exception: continue @@ -162,184 +190,258 @@ def init_threaded(): os.environ['PATH'] = os.environ['PATH'] + os.pathsep + os.path.abspath('.') os.environ['PATH'] = os.environ['PATH'] + os.pathsep + os.path.abspath('./lib') if aquery_config.run_backend: - server_so = ctypes.CDLL('./'+server_bin) - global cfg, th, send - send = server_so['receive_args'] + server_so = ctypes.CDLL('./'+state.server_bin) + state.send = server_so['receive_args'] aquery_config.have_hge = server_so['have_hge']() if aquery_config.have_hge: from engine.types import get_int128_support get_int128_support() - th = threading.Thread(target=server_so['main'], args=(-1, ctypes.POINTER(ctypes.c_char_p)(cfg.c)), daemon=True) - th.start() + state.th = threading.Thread(target=server_so['main'], args=(-1, ctypes.POINTER(ctypes.c_char_p)(state.cfg.c)), daemon=True) + state.th.start() + +def init_prompt() -> PromptState: + aquery_config.init_config() + + state = PromptState() + if aquery_config.rebuild_backend: + try: + os.remove(state.server_bin) + except Exception as e: + print(type(e), e) + subprocess.call(['make', "info"]) + subprocess.call(['make', state.server_bin], stdout=nullstream) -if server_mode == RunType.IPC: - atexit.register(rm) - init = init_ipc - set_ready = lambda : mm.seek(0,os.SEEK_SET) or mm.write(b'\x01\x01') - def __get_ready(): - mm.seek(0,os.SEEK_SET) - return mm.read(2)[1] - get_ready = __get_ready - server_status = lambda : server.poll() is not None -else: - init = init_threaded - rm = lambda: None - def __set_ready(): - global cfg - cfg.new_query = 1 - set_ready = __set_ready - get_ready = lambda: aquery_config.run_backend and cfg.new_query - if aquery_config.run_backend: - server_status = lambda : not th.is_alive() + state.cfg = Config(state.server_mode) + + if state.server_mode == RunType.IPC: + atexit.register(lambda: rm(state)) + state.init = init_ipc + state.set_ready = lambda : state.mm.seek(0,os.SEEK_SET) or state.mm.write(b'\x01\x01') + def __get_ready(): + state.mm.seek(0,os.SEEK_SET) + return state.mm.read(2)[1] + state.get_ready = __get_ready + state.server_status = lambda : state.server.poll() is not None else: - server_status = lambda : True -init() - -test_parser = True - -# code to test parser -ws = re.compile(r'\s+') + state.init = init_threaded + rm = lambda: None + def __set_ready(): + state.cfg.new_query = 1 + state.set_ready = __set_ready + state.get_ready = lambda: aquery_config.run_backend and state.cfg.new_query + if aquery_config.run_backend: + state.server_status = lambda : not state.th.is_alive() + else: + state.server_status = lambda : True + state.init(state) + return state -q = 'SELECT p.Name, v.Name FROM Production.Product p JOIN Purchasing.ProductVendor pv ON p.ProductID = pv.ProductID JOIN Purchasing.Vendor v ON pv.BusinessEntityID = v.BusinessEntityID WHERE ProductSubcategoryID = 15 ORDER BY v.Name;' - -res = parser.parse(q) +def save(q:str, cxt: xengine.Context): + savecmd = re.split(r'[ \t]', q) + if len(savecmd) > 1: + fname = savecmd[1] + else: + tm = time.gmtime() + fname = f'{tm.tm_year}{tm.tm_mon}_{tm.tm_mday}_{tm.tm_hour}:{tm.tm_min}:{tm.tm_sec}' + if cxt: + from typing import Optional + def savefile(attr:str, desc:str, ext:Optional[str] = None): + if hasattr(cxt, attr): + attr : str = getattr(cxt, attr) + if attr: + ext = ext if ext else '.' + desc + name = fname if fname.endswith(ext) else fname + ext + with open('saves/' + name, 'wb') as cfile: + cfile.write(attr.encode('utf-8')) + print(f'saved {desc} code as {name}') + savefile('ccode', 'cpp') + savefile('udf', 'udf', '.hpp') + savefile('sql', 'sql') -payload = None -keep = True -cxt = engine.initialize() -cxt.Info(res) -while test_parser: - try: - if server_status(): - init() - while get_ready(): - time.sleep(.00001) - print("> ", end="") - q = input().lower() - if q == 'exec': # generate build and run (AQuery Engine) - cfg.backend_type = Backend_Type.BACKEND_AQuery.value - cxt = engine.exec(stmts, cxt, keep) - if subprocess.call(['make', 'snippet'], stdout = nullstream) == 0: - set_ready() - continue - - elif q == 'xexec': # generate build and run (MonetDB Engine) - cfg.backend_type = Backend_Type.BACKEND_MonetDB.value - cxt = xengine.exec(stmts, cxt, keep) - if server_mode == RunType.Threaded: - # assignment to avoid auto gc - # sqls = [s.strip() for s in cxt.sql.split(';')] - qs = [ctypes.c_char_p(bytes(q, 'utf-8')) for q in cxt.queries if len(q)] - sz = len(qs) - payload = (ctypes.c_char_p*sz)(*qs) +def main(running = lambda:True, next = input, state = None): + if state is None: + state = init_prompt() + q = '' + payload = None + keep = True + cxt = engine.initialize() + while running(): + try: + if state.server_status(): + state.init() + while state.get_ready(): + time.sleep(.00001) + print("> ", end="") + q = next().lower() + if q == 'exec': # generate build and run (AQuery Engine) + state.cfg.backend_type = Backend_Type.BACKEND_AQuery.value + cxt = engine.exec(state.stmts, cxt, keep) + if subprocess.call(['make', 'snippet'], stdout = nullstream) == 0: + state.set_ready() + continue + + elif q == 'xexec': # generate build and run (MonetDB Engine) + state.cfg.backend_type = Backend_Type.BACKEND_MonetDB.value + cxt = xengine.exec(state.stmts, cxt, keep) + if state.server_mode == RunType.Threaded: + # assignment to avoid auto gc + # sqls = [s.strip() for s in cxt.sql.split(';')] + qs = [ctypes.c_char_p(bytes(q, 'utf-8')) for q in cxt.queries if len(q)] + sz = len(qs) + payload = (ctypes.c_char_p*sz)(*qs) + state.payload = payload + try: + state.send(sz, payload) + except TypeError as e: + print(e) + if cxt.udf is not None: + with open('udf.hpp', 'wb') as outfile: + outfile.write(cxt.udf.encode('utf-8')) + + if cxt.has_dll: + with open('out.cpp', 'wb') as outfile: + outfile.write((cxt.finalize()).encode('utf-8')) + subprocess.call(['make', 'snippet'], stdout = nullstream) + state.cfg.has_dll = 1 + else: + state.cfg.has_dll = 0 + state.set_ready() + + continue + + elif q == 'dbg': + import code + from copy import deepcopy + var = {**globals(), **locals()} + sh = code.InteractiveConsole(var) try: - send(sz, payload) - except TypeError as e: + sh.interact(banner = 'debugging session began.', exitmsg = 'debugging session ended.') + except BaseException as e: + # don't care about anything happened in interactive console print(e) - if cxt.udf is not None: - with open('udf.hpp', 'wb') as outfile: - outfile.write(cxt.udf.encode('utf-8')) - - if cxt.has_dll: - with open('out.cpp', 'wb') as outfile: + continue + elif q.startswith('log'): + qs = re.split(r'[ \t]', q) + if len(qs) > 1: + cxt.log_level = qs[1] + else: + cxt.print(cxt.log_level) + continue + elif q == 'k': + subprocess.call(state.basecmd) + continue + elif q == 'print': + cxt.print(state.stmts) + continue + elif q.startswith('save'): + save(q, cxt) + continue + elif q == 'keep': + keep = not keep + continue + elif q == 'format' or q == 'fmt': + subprocess.call(['clang-format', 'out.cpp']) + elif q == 'exit': + rm(state) + exit() + elif q == 'r': # build and run + if subprocess.call(['make', 'snippet']) == 0: + state.set_ready() + continue + elif q == 'rr': # run + state.set_ready() + continue + elif q == 'script': + # TODO: script mode + pass + elif q.startswith('save2'): + filename = re.split(r'[ \t]', q) + if (len(filename) > 1): + filename = filename[1] + else: + filename = f'out_{base62uuid(4)}.cpp' + with open(filename, 'wb') as outfile: outfile.write((cxt.finalize()).encode('utf-8')) - subprocess.call(['make', 'snippet'], stdout = nullstream) - cfg.has_dll = 1 - else: - cfg.has_dll = 0 - set_ready() - - continue - - elif q == 'dbg': - import code - from copy import deepcopy - var = {**globals(), **locals()} - sh = code.InteractiveConsole(var) - try: - sh.interact(banner = 'debugging session began.', exitmsg = 'debugging session ended.') - except BaseException as e: - # don't care about anything happened in interactive console - print(e) - continue - elif q.startswith('log'): - qs = re.split(r'[ \t]', q) - if len(qs) > 1: - cxt.log_level = qs[1] - else: - cxt.print(cxt.log_level) - continue - elif q == 'k': - subprocess.call(basecmd) - continue - elif q == 'print': - cxt.print(stmts) + continue + trimed = ws.sub(' ', q.lower()).split(' ') + if trimed[0].startswith('f'): + fn = 'stock.a' if len(trimed) <= 1 or len(trimed[1]) == 0 \ + else trimed[1] + + with open(fn, 'r') as file: + contents = file.read()#.lower() + state.stmts = parser.parse(contents) + continue + state.stmts = parser.parse(q) + cxt.Info(state.stmts) + except ParseException as e: + print(e) continue - elif q.startswith('save'): - savecmd = re.split(r'[ \t]', q) - if len(savecmd) > 1: - fname = savecmd[1] - else: - tm = time.gmtime() - fname = f'{tm.tm_year}{tm.tm_mon}_{tm.tm_mday}_{tm.tm_hour}:{tm.tm_min}:{tm.tm_sec}' - if cxt: - from typing import Optional - def savefile(attr:str, desc:str, ext:Optional[str] = None): - if hasattr(cxt, attr): - attr : str = getattr(cxt, attr) - if attr: - ext = ext if ext else '.' + desc - name = fname if fname.endswith(ext) else fname + ext - with open('saves/' + name, 'wb') as cfile: - cfile.write(attr.encode('utf-8')) - print(f'saved {desc} code as {name}') - savefile('ccode', 'cpp') - savefile('udf', 'udf', '.hpp') - savefile('sql', 'sql') - - continue - elif q == 'keep': - keep = not keep - continue - elif q == 'format' or q == 'fmt': - subprocess.call(['clang-format', 'out.cpp']) - elif q == 'exit': + except (ValueError, FileNotFoundError) as e: + print(e) + except (KeyboardInterrupt): break - elif q == 'r': # build and run - if subprocess.call(['make', 'snippet']) == 0: - set_ready() - continue - elif q == 'rr': # run - set_ready() - continue - elif q.startswith('save2'): - filename = re.split(r'[ \t]', q) - if (len(filename) > 1): - filename = filename[1] - else: - filename = f'out_{base62uuid(4)}.cpp' - with open(filename, 'wb') as outfile: - outfile.write((cxt.finalize()).encode('utf-8')) - continue - trimed = ws.sub(' ', q.lower()).split(' ') - if trimed[0].startswith('f'): - fn = 'stock.a' if len(trimed) <= 1 or len(trimed[1]) == 0 \ - else trimed[1] - - with open(fn, 'r') as file: - contents = file.read()#.lower() - stmts = parser.parse(contents) - continue - stmts = parser.parse(q) - cxt.Info(stmts) - except ParseException as e: - print(e) - continue - except (ValueError, FileNotFoundError) as e: - print(e) - except (KeyboardInterrupt): - break - except: - rm() - raise -rm() + except: + import code, traceback + sh = code.InteractiveConsole({**globals(), **locals()}) + sh.interact(banner = traceback.format_exc(), exitmsg = 'debugging session ended.') + save('', cxt) + rm(state) + raise + rm(state) +## FUNCTIONS END + +## MAIN +if __name__ == '__main__': + state = None + nextcmd = '' + def check_param(param:List[str], args = False) -> bool: + global nextcmd + for p in param: + if p in sys.argv: + if args: + return True + pos = sys.argv.index(p) + if len(sys.argv) > pos + 1: + nextcmd = sys.argv[pos + 1] + return True + return False + if check_param(['-h', '--help'], True): + print(help_message) + exit() + + if len(sys.argv) == 2: + nextcmd = sys.argv[1] + if nextcmd.startswith('-'): + nextcmd = '' + + nextcmd = 'test.aquery' + if nextcmd or check_param(['-s', '--script']): + with open(nextcmd) as file: + nextcmd = file.readline() + from engine.utils import _Counter + if file.name.endswith('aquery') or nextcmd.strip() == '#!aquery': + state = init_prompt() + while(nextcmd): + while(not ws.sub('', nextcmd) or nextcmd.strip().startswith('#')): + nextcmd = file.readline() + cnt = _Counter(1) + main(lambda : cnt.inc(-1) > 0, lambda:nextcmd.strip(), state) + nextcmd = file.readline() + + if check_param(['-p', '--parse']): + with open(nextcmd, 'r') as file: + contents = file.read() + print(parser.parse(contents)) + + if check_param(['-m', '--mode', '--run-type']): + nextcmd = nextcmd.lower() + ipc_string = ['ipc', 'proc', '2', 'standalong'] + thread_string = ['thread', '1'] + if any([s in nextcmd for s in ipc_string]): + server_mode = RunType.IPC + elif any([s in nextcmd for s in thread_string]): + server_mode = RunType.Threaded + + main(state=state) + \ No newline at end of file diff --git a/q1.sql b/q1.sql index 360dc2f..0e182d4 100644 --- a/q1.sql +++ b/q1.sql @@ -1,10 +1,10 @@ -CREATE TABLE test(a INT, b INT, c INT, d INT) +CREATE TABLE testq1(a INT, b INT, c INT, d INT) LOAD DATA INFILE "test.csv" -INTO TABLE test +INTO TABLE testq1 FIELDS TERMINATED BY "," SELECT sum(c), b, d -FROM test +FROM testq1 group by a,b,d order by d DESC, b ASC diff --git a/requirements.txt b/requirements.txt index 356f166..d48c2ed 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,7 @@ mo-future mo-dots==8.20.21357 mo-parsing mo-imports -readline; sys_platform != 'win32' +dataclasses; python_version < '3.7' +readline; sys_platform == 'linux' numpy + diff --git a/sdk/aquery.h b/sdk/aquery.h index e541c82..31fe785 100644 --- a/sdk/aquery.h +++ b/sdk/aquery.h @@ -1,9 +1,10 @@ #include "../server/libaquery.h" +typedef void (*dealloctor_t) (void*); extern void* Aalloc(size_t sz); -extern int Afree(void * mem); -extern size_t register_memory(void* ptr, void(dealloc)(void*)); +extern void Afree(void * mem); +extern size_t register_memory(void* ptr, dealloctor_t deallocator); struct Session{ struct Statistic{ diff --git a/sdk/aquery_mem.cpp b/sdk/aquery_mem.cpp index ac66277..bed5949 100644 --- a/sdk/aquery_mem.cpp +++ b/sdk/aquery_mem.cpp @@ -5,29 +5,23 @@ #include Session* session; -void init_session(){ -} - -void end_session(){ - -} -void* Aalloc(size_t sz){ +void* Aalloc(size_t sz, deallocator_t deallocator){ void* mem = malloc(sz); - auto memmap = (std::unordered_map*) session->memory_map; - memmap->insert(mem); + auto memmap = (std::unordered_map*) session->memory_map; + memmap[mem] = deallocator; return mem; } -int Afree(void* mem){ - auto memmap = (std::unordered_map*) session->memory_map; +void Afree(void* mem){ + auto memmap = (std::unordered_map*) session->memory_map; + memmap[mem](mem); memmap->erase(mem); - return free(mem); } -void register_memory(void* ptr, void(dealloc)(void*)){ - auto memmap = (std::unordered_map*) session->memory_map; - memmap->insert(ptr); +void register_memory(void* ptr, deallocator_t deallocator){ + auto memmap = (std::unordered_map*) session->memory_map; + memmap[ptr] = deallocator; } diff --git a/server/libaquery.h b/server/libaquery.h index 796f140..b103fcf 100644 --- a/server/libaquery.h +++ b/server/libaquery.h @@ -21,6 +21,15 @@ struct Config{ int buffer_sizes[]; }; +struct Session{ + struct Statistic{ + size_t total_active; + size_t cnt_object; + size_t total_alloc; + }; + void* memory_map; +}; + struct Context{ typedef int (*printf_type) (const char *format, ...); std::unordered_map tables; @@ -34,6 +43,8 @@ struct Context{ void* alt_server; Log_level log_level = LOG_INFO; + Session current; + #ifdef THREADING void* thread_pool; #endif @@ -49,6 +60,8 @@ struct Context{ if (log_level <= LOG_ERROR) print(args...); } + void init_session(); + void end_session(); }; #ifdef _WIN32 @@ -58,4 +71,4 @@ struct Context{ #endif #define __AQEXPORT__(_Ty) extern "C" _Ty __DLLEXPORT__ -#endif \ No newline at end of file +#endif diff --git a/server/mariadb_conn.h b/server/mariadb_conn.h index 888b79c..170a7d8 100644 --- a/server/mariadb_conn.h +++ b/server/mariadb_conn.h @@ -9,6 +9,7 @@ struct Server{ MYSQL *server = 0; Context *cxt = 0; bool status = 0; + bool has_error = false; char* query = 0; int type = 0; diff --git a/server/monetdb_conn.cpp b/server/monetdb_conn.cpp index be0f043..0cfc911 100644 --- a/server/monetdb_conn.cpp +++ b/server/monetdb_conn.cpp @@ -1,6 +1,8 @@ #include "libaquery.h" #include #include "monetdb_conn.h" +#include "monetdbe.h" +#undef static_assert const char* monetdbe_type_str[] = { "monetdbe_bool", "monetdbe_int8_t", "monetdbe_int16_t", "monetdbe_int32_t", "monetdbe_int64_t", @@ -37,6 +39,7 @@ Server::Server(Context* cxt){ } void Server::connect(Context *cxt){ + auto server = static_cast(this->server); if (cxt){ cxt->alt_server = this; this->cxt = cxt; @@ -47,14 +50,14 @@ void Server::connect(Context *cxt){ } if (server){ - printf("Error: Server %llx already connected. Restart? (Y/n). \n", server); + printf("Error: Server %p already connected. Restart? (Y/n). \n", server); char c[50]; std::cin.getline(c, 49); for(int i = 0; i < 50; ++i){ if (!c[i] || c[i] == 'y' || c[i] == 'Y'){ monetdbe_close(*server); free(*server); - server = 0; + this->server = 0; break; } else if(c[i]&&!(c[i] == ' ' || c[i] == '\t')) @@ -64,36 +67,65 @@ void Server::connect(Context *cxt){ server = (monetdbe_database*)malloc(sizeof(monetdbe_database)); auto ret = monetdbe_open(server, nullptr, nullptr); + if (ret == 0){ + status = true; + this->server = server; + } + else{ + if(server) + free(server); + this->server = 0; + status = false; + puts(ret == -1 ? "Allocation Error." : "Internal Database Error."); + } } void Server::exec(const char* q){ - auto qresult = monetdbe_query(*server, const_cast(q), &res, &cnt); - if (res != 0) - this->cnt = res->nrows; + auto server = static_cast(this->server); + auto _res = static_cast(this->res); + monetdbe_cnt _cnt = 0; + auto qresult = monetdbe_query(*server, const_cast(q), &_res, &_cnt); + if (_res != 0){ + this->cnt = _res->nrows; + this->res = _res; + } if (qresult != nullptr){ printf("Execution Failed. %s\n", qresult); last_error = qresult; } } +bool Server::haserror(){ + if (last_error){ + last_error = 0; + return true; + } + else{ + return false; + } +} + void Server::close(){ if(this->server){ - monetdbe_close(*(this->server)); - free(this->server); + auto server = static_cast(this->server); + monetdbe_close(*(server)); + free(server); this->server = 0; } } void* Server::getCol(int col_idx){ if(res){ - res->ncols; - auto err_msg = monetdbe_result_fetch(res, &ret_col, col_idx); + auto _res = static_cast(this->res); + auto err_msg = monetdbe_result_fetch(_res, + reinterpret_cast(&ret_col), col_idx); if(err_msg == nullptr) { - cnt = ret_col->count; + auto _ret_col = static_cast(this->ret_col); + cnt = _ret_col->count; printf("Dbg: Getting col %s, type: %s\n", - ret_col->name, monetdbe_type_str[ret_col->type]); - return ret_col->data; + _ret_col->name, monetdbe_type_str[_ret_col->type]); + return _ret_col->data; } else{ printf("Error fetching result: %s\n", err_msg); diff --git a/server/monetdb_conn.h b/server/monetdb_conn.h index 4be30a0..b550476 100644 --- a/server/monetdb_conn.h +++ b/server/monetdb_conn.h @@ -1,17 +1,16 @@ -#include "monetdbe.h" struct Context; struct Server{ - monetdbe_database *server = 0; + void *server = 0; Context *cxt = 0; bool status = 0; char* query = 0; int type = 1; - monetdbe_result* res = 0; - monetdbe_column* ret_col = 0; - monetdbe_cnt cnt = 0; + void* res = 0; + void* ret_col = 0; + long long cnt = 0; char* last_error = 0; Server(Context* cxt = nullptr); @@ -19,5 +18,6 @@ struct Server{ void exec(const char* q); void *getCol(int col_idx); void close(); + bool haserror(); ~Server(); }; diff --git a/server/server.cpp b/server/server.cpp index 50a383e..522df1f 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -72,6 +72,12 @@ __AQEXPORT__(bool) have_hge(){ #endif } +void Context::init_session(){ + if (log_level == LOG_INFO){ + Session::Statistic stats; + } +} + int dll_main(int argc, char** argv, Context* cxt){ Config *cfg = reinterpret_cast(argv[0]); @@ -99,15 +105,20 @@ int dll_main(int argc, char** argv, Context* cxt){ } for(int i = 0; i < n_recv; ++i) { + printf("%s, %d\n", n_recvd[i], n_recvd[i][0] == 'Q'); if (n_recvd[i][0] == 'Q'){ server->exec(n_recvd[i] + 1); printf("Exec Q%d: %s\n", i, n_recvd[i]); } - else if (n_recvd[i][0] == 'P' && handle) { + else if (n_recvd[i][0] == 'P' && handle && !server->haserror()) { code_snippet c = reinterpret_cast(dlsym(handle, n_recvd[i]+1)); c(cxt); } } + if(handle) { + dlclose(handle); + handle = 0; + } n_recv = 0; } if(server->last_error == nullptr){ @@ -166,11 +177,11 @@ extern "C" int __DLLEXPORT__ main(int argc, char** argv) { cxt->log("running: %s\n", running? "true":"false"); cxt->log("ready: %s\n", ready? "true":"false"); void* handle = dlopen("./dll.so", RTLD_LAZY); - cxt->log("handle: %lx\n", handle); + cxt->log("handle: %p\n", handle); if (handle) { cxt->log("inner\n"); code_snippet c = reinterpret_cast(dlsym(handle, "dllmain")); - cxt->log("routine: %lx\n", c); + cxt->log("routine: %p\n", c); if (c) { cxt->log("inner\n"); cxt->err("return: %d\n", c(cxt)); @@ -195,11 +206,24 @@ int test_main() cxt->alt_server = new Server(cxt); Server* server = reinterpret_cast(cxt->alt_server); const char* qs[]= { - "CREATE TABLE tt(a INT, b INT, c INT, d INT);", - "COPY OFFSET 2 INTO tt FROM 'D:/gg/AQuery++/test.csv' ON SERVER USING DELIMITERS ',';", - "CREATE TABLE sale(Mont INT, sales INT);", - "COPY OFFSET 2 INTO sale FROM 'D:/gg/AQuery++/moving_avg.csv' ON SERVER USING DELIMITERS ',';", - "SELECT a FROM tt, sale WHERE a = Mont ;" + "CREATE TABLE stocks(timestamp INT, price INT);", + "INSERT INTO stocks VALUES(1, 15);;", + "INSERT INTO stocks VALUES(2,19); ", + "INSERT INTO stocks VALUES(3,16);", + "INSERT INTO stocks VALUES(4,17);", + "INSERT INTO stocks VALUES(5,15);", + "INSERT INTO stocks VALUES(6,13);", + "INSERT INTO stocks VALUES(7,5);", + "INSERT INTO stocks VALUES(8,8);", + "INSERT INTO stocks VALUES(9,7);", + "INSERT INTO stocks VALUES(10,13);", + "INSERT INTO stocks VALUES(11,11);", + "INSERT INTO stocks VALUES(12,14);", + "INSERT INTO stocks VALUES(13,10);", + "INSERT INTO stocks VALUES(14,5);", + "INSERT INTO stocks VALUES(15,2);", + "INSERT INTO stocks VALUES(16,5);", + "SELECT price, timestamp FROM stocks WHERE (((price - timestamp) > 1) AND (NOT ((price * timestamp) < 100))) ;", }; n_recv = sizeof(qs)/(sizeof (char*)); n_recvd = const_cast(qs); @@ -215,11 +239,11 @@ int test_main() cxt->log_level = LOG_INFO; puts(cpp_17 ?"true":"false"); void* handle = dlopen("./dll.so", RTLD_LAZY); - cxt->log("handle: %llx\n", handle); + cxt->log("handle: %p\n", handle); if (handle) { cxt->log("inner\n"); - code_snippet c = reinterpret_cast(dlsym(handle, "dllmain")); - cxt->log("routine: %llx\n", c); + code_snippet c = reinterpret_cast(dlsym(handle, "dll_6EgnKh")); + cxt->log("routine: %p\n", c); if (c) { cxt->log("inner\n"); cxt->log("return: %d\n", c(cxt)); diff --git a/server/server.vcxproj b/server/server.vcxproj index f7f02db..bc613e7 100644 --- a/server/server.vcxproj +++ b/server/server.vcxproj @@ -196,7 +196,7 @@ true true stdcpplatest - stdc17 + stdc11 $(ProjectDir)\..\monetdb\msvc diff --git a/server/table.cpp b/server/table.cpp index 2ba2f3d..0eafe73 100644 --- a/server/table.cpp +++ b/server/table.cpp @@ -25,4 +25,4 @@ std::ostream& operator<<(std::ostream& os, __uint128_t & v) print(v); return os; } -#endif \ No newline at end of file +#endif diff --git a/server/tests/thread_pool.hpp b/server/tests/thread_pool.hpp index d7b8f62..b43bd8a 100644 --- a/server/tests/thread_pool.hpp +++ b/server/tests/thread_pool.hpp @@ -1,3 +1,5 @@ +#pragma once + #include "../threading.h" #include #include @@ -5,8 +7,8 @@ using namespace std; FILE *fp; -int testing_throughput(uint32_t n_jobs){ - printf("Threadpool througput test with %u jobs.\n", n_jobs); +long long testing_throughput(uint32_t n_jobs, bool prompt = true){ + printf("Threadpool througput test with %u jobs. Press any key to start.\n", n_jobs); auto tp = ThreadPool(thread::hardware_concurrency()); getchar(); @@ -19,22 +21,26 @@ int testing_throughput(uint32_t n_jobs){ auto t = (chrono::high_resolution_clock::now() - time).count(); printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", i, t, i/(double)(t)); //this_thread::sleep_for(2s); - return 0; + fclose(fp); + return t; } -int testing_transaction(uint32_t n_burst, uint32_t n_batch, - uint32_t base_time, uint32_t var_time){ +long long testing_transaction(uint32_t n_burst, uint32_t n_batch, + uint32_t base_time, uint32_t var_time, bool prompt = true, FILE* _fp = stdout){ printf("Threadpool transaction test: burst: %u, batch: %u, time: [%u, %u].\n" , n_burst, n_batch, base_time, var_time + base_time); - + if (prompt) { + puts("Press any key to start."); + getchar(); + } auto tp = ThreadPool(thread::hardware_concurrency()); - getchar(); + fp = _fp; auto i = 0u, j = 0u; auto time = chrono::high_resolution_clock::now(); while(j++ < n_batch){ i = 0u; while(i++ < n_burst) - tp.enqueue_task({ [](void* f) { printf( "%d ", *(int*)f); free(f); }, new int(i) }); + tp.enqueue_task({ [](void* f) { fprintf(fp, "%d ", *(int*)f); free(f); }, new int(j) }); fflush(stdout); this_thread::sleep_for(chrono::microseconds(rand()%var_time + base_time)); } @@ -42,6 +48,27 @@ int testing_transaction(uint32_t n_burst, uint32_t n_batch, while (tp.busy()) this_thread::sleep_for(1s); auto t = (chrono::high_resolution_clock::now() - time).count(); printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", j*i, t, j*i/(double)(t)); - return 0; + return t; + +} +long long testing_destruction(bool prompt = true){ + fp = fopen("tmp.tmp", "w"); + if (prompt) { + puts("Press any key to start."); + getchar(); + } + auto time = chrono::high_resolution_clock::now(); + for(int i = 0; i < 8; ++i) + testing_transaction(0xfff, 0xff, 400, 100, false, fp); + for(int i = 0; i < 64; ++i) + testing_transaction(0xff, 0xf, 60, 20, false, fp); + for(int i = 0; i < 1024; ++i) { + auto tp = new ThreadPool(256); + delete tp; + } + return 0; + auto t = (chrono::high_resolution_clock::now() - time).count(); + fclose(fp); + return t; } diff --git a/server/threading.cpp b/server/threading.cpp index 734ae36..64f0022 100644 --- a/server/threading.cpp +++ b/server/threading.cpp @@ -18,9 +18,9 @@ void ThreadPool::daemon_proc(uint32_t id){ for(; tf[id]; this_thread::sleep_for(*ticking? 0ns:100ms)) { if (A_TP_HAVE_PAYLOAD(tf[id])) { - A_TP_SET_PAYLOAD(tf[id]); + // A_TP_SET_PAYLOAD(tf[id]); current_payload[id](); - current_payload[id].empty(); + //current_payload[id].empty(); A_TP_UNSET_PAYLOAD(tf[id]); } } @@ -32,31 +32,46 @@ void ThreadPool::tick(){ auto tf = static_cast*>(this->thread_flags); auto ticking = static_cast*>(this->ticking); auto th = static_cast(this->thread_handles); - for(; !this->terminate; this_thread::sleep_for(50ms)){ + + auto n_threads = (uint8_t) this->n_threads; + for(; !this->terminate; this_thread::sleep_for(5ms)){ if(*ticking) { - bool quit = false; - for(; !quit; ){ - for(uint32_t i = 0; i < n_threads; ++i){ - if(!A_TP_HAVE_PAYLOAD(tf[i])){ - pq_lock->lock(); - payload_t& p = pq->front(); - current_payload[i] = p; - A_TP_SET_PAYLOAD(tf[i]); - pq->pop_front(); - quit = !pq->size(); - pq_lock->unlock(); - if (quit) break; + size_t sz = pq->size(); + while(!pq->empty()){ + while(!pq->empty() && sz < (n_threads<<10)){ + for(uint8_t i = 0; i < n_threads; ++i){ + if(!A_TP_HAVE_PAYLOAD(tf[i])){ + pq_lock->lock(); + current_payload[i] = pq->front(); + A_TP_SET_PAYLOAD(tf[i]); + pq->pop_front(); + sz = pq->size(); + pq_lock->unlock(); + if (sz>=(n_threads<<10) || sz == 0) break; + } + } + } + pq_lock->lock(); + while(!pq->empty()){ + for(uint8_t i = 0; i < n_threads; ++i){ + if(!A_TP_HAVE_PAYLOAD(tf[i])){ + current_payload[i] = pq->front(); + A_TP_SET_PAYLOAD(tf[i]); + pq->pop_front(); + if(pq->empty()) break; + } } } + pq_lock->unlock(); } - puts("done"); + // puts("done"); *ticking = false; } } - for (uint32_t i = 0; i < n_threads; ++i) + for (uint8_t i = 0; i < n_threads; ++i) tf[i] &= 0xfd; - for (uint32_t i = 0; i < n_threads; ++i) + for (uint8_t i = 0; i < n_threads; ++i) th[i].join(); delete[] th; @@ -70,7 +85,11 @@ void ThreadPool::tick(){ ThreadPool::ThreadPool(uint32_t n_threads) : n_threads(n_threads) { - printf("Thread pool started with %u threads;", n_threads); + if (n_threads <= 0){ + n_threads = thread::hardware_concurrency(); + this->n_threads = n_threads; + } + printf("Thread pool started with %u threads;\n", n_threads); fflush(stdout); this->terminate = false; payload_queue = new deque; @@ -81,14 +100,15 @@ ThreadPool::ThreadPool(uint32_t n_threads) thread_flags = tf; ticking = static_cast(new atomic(false)); + payload_queue_lock = new mutex(); + tick_handle = new thread(&ThreadPool::tick, this); + current_payload = new payload_t[n_threads]; + for (uint32_t i = 0; i < n_threads; ++i){ atomic_init(tf + i, 0b10); th[i] = thread(&ThreadPool::daemon_proc, this, i); } - payload_queue_lock = new mutex(); - tick_handle = new thread(&ThreadPool::tick, this); - current_payload = new payload_t[n_threads]; } @@ -131,4 +151,3 @@ bool ThreadPool::busy(){ } return true; } - diff --git a/server/threading2.cpp b/server/threading2.cpp new file mode 100644 index 0000000..0fef15d --- /dev/null +++ b/server/threading2.cpp @@ -0,0 +1,167 @@ +#include "threading.h" +#include +#include +#include +#include + +using namespace std; +using namespace chrono_literals; + +#define A_TP_HAVE_PAYLOAD(x) ((x) & 0b1) +#define A_TP_SET_PAYLOAD(x) ((x) |= 0b1) +#define A_TP_UNSET_PAYLOAD(x) ((x) &= 0xfe) +#define A_TP_IS_RUNNING(x) ((x) & 0b10) + +void ThreadPool::daemon_proc(uint32_t id){ + decltype(auto) tfid = static_cast*>(this->thread_flags)[id]; + auto ticking = static_cast*>(this->ticking); + auto& currentpayload = this->current_payload[id]; + auto pq_lock = static_cast(payload_queue_lock); + auto pq = static_cast *>(payload_queue); + + bool idle = true; + uint32_t timer = 0; + for(; tfid; ) { + if (A_TP_HAVE_PAYLOAD(tfid)) { + //A_TP_SET_PAYLOAD(tfid); + currentpayload(); + // currentpayload.empty(); + A_TP_UNSET_PAYLOAD(tfid); + if (idle) { + idle = false; + *ticking -= 1; + timer = 1; + } + } + + else if (!pq->empty()) { + pq_lock->lock(); + if (!pq->empty()) { + pq->front()(); + pq->pop_front(); + } + pq_lock->unlock(); + if (idle) { + idle = false; + *ticking -= 1; + timer = 1; + } + } + else if (!idle) { + idle = true; + *ticking += 1; + timer = 1; + } + else if (*ticking == n_threads ) { + if (timer > 1000000u) + this_thread::sleep_for(chrono::nanoseconds(timer/100u)); + timer = timer > 4200000000u ? 4200000000u : timer*1.0000001 + 1; + } + + } +} + +void ThreadPool::tick(){ + auto pq_lock = static_cast(payload_queue_lock); + auto pq = static_cast *>(payload_queue); + auto tf = static_cast*>(this->thread_flags); + auto ticking = static_cast*>(this->ticking); + auto th = static_cast(this->thread_handles); + //uint32_t threshold = (n_threads << 10u); + for(; !this->terminate; this_thread::sleep_for(50ms)){ + // if(*ticking) { + // while(pq->size() > 0) + // { + // bool pqsize = false; + // for(; !pqsize; ){ + // for(uint32_t i = 0; i < n_threads; ++i){ + // if(!A_TP_HAVE_PAYLOAD(tf[i])){ + // pq_lock->lock(); + // current_payload[i] = pq->front(); + // A_TP_SET_PAYLOAD(tf[i]); + // pq->pop_front(); + // pqsize =! pq->size(); + // pq_lock->unlock(); + // if (pqsize) break; + // } + // } + // } + // } + // // puts("done"); + // *ticking = false; + // } + } + + for (uint32_t i = 0; i < n_threads; ++i) + tf[i] &= 0xfd; + for (uint32_t i = 0; i < n_threads; ++i) + th[i].join(); + + delete[] th; + delete[] tf; + delete pq; + delete pq_lock; + delete ticking; + auto cp = static_cast(current_payload); + delete[] cp; +} + +ThreadPool::ThreadPool(uint32_t n_threads) + : n_threads(n_threads) { + + printf("Thread pool started with %u threads;\n", n_threads); + fflush(stdout); + this->terminate = false; + payload_queue = new deque; + auto th = new thread[n_threads]; + auto tf = new atomic[n_threads]; + + thread_handles = th; + thread_flags = tf; + ticking = static_cast(new atomic(n_threads)); + + payload_queue_lock = new mutex(); + tick_handle = new thread(&ThreadPool::tick, this); + current_payload = new payload_t[n_threads]; + + for (uint32_t i = 0; i < n_threads; ++i){ + atomic_init(tf + i, 0b10); + th[i] = thread(&ThreadPool::daemon_proc, this, i); + } + +} + + +void ThreadPool::enqueue_task(const payload_t& payload){ + auto pq_lock = static_cast(payload_queue_lock); + auto pq = static_cast *>(payload_queue); + auto tf = static_cast*>(this->thread_flags); + auto& ticking = *static_cast*>(this->ticking); + + if (ticking > 0){ + for (uint32_t i = 0; i < n_threads; ++i){ + if(!A_TP_HAVE_PAYLOAD(tf[i])){ + current_payload[i] = payload; + A_TP_SET_PAYLOAD(tf[i]); + return; + } + } + } + + pq_lock->lock(); + pq->push_back(payload); + pq_lock->unlock(); +} + +ThreadPool::~ThreadPool() { + this->terminate = true; + auto tick = static_cast (tick_handle); + tick->join(); + delete tick; + puts("Thread pool terminated."); +} + +bool ThreadPool::busy(){ + return !(*(atomic*)ticking == n_threads); +} + diff --git a/server/types.h b/server/types.h index 0f5bb97..1df357f 100644 --- a/server/types.h +++ b/server/types.h @@ -49,7 +49,7 @@ namespace types { #define ULL_Type __uint128_t #define LL_Type __int128_t #else -#define F_INT128 +#define F_INT128(__F_) #define ULL_Type unsigned long long #define LL_Type long long #endif diff --git a/server/utils.cpp b/server/utils.cpp index 5ac8bc1..1766c10 100644 --- a/server/utils.cpp +++ b/server/utils.cpp @@ -8,7 +8,7 @@ string base62uuid(int l = 8) { static mt19937_64 engine(chrono::system_clock::now().time_since_epoch().count()); static uniform_int_distribution u(0x10000, 0xfffff); uint64_t uuid = (u(engine) << 32ull) + (chrono::system_clock::now().time_since_epoch().count() & 0xffffffff); - printf("%llx\n", uuid); + printf("%p\n", uuid); string ret; while (uuid && l-- >= 0) { ret = string("") + base62alp[uuid % 62] + ret; diff --git a/test.aquery b/test.aquery new file mode 100644 index 0000000..50cc7c9 --- /dev/null +++ b/test.aquery @@ -0,0 +1,14 @@ +#!aquery + +f stock.a +xexec +f moving_avg.a +xexec +f q1.sql +xexec +f joins.a +xexec +f udf3.a +xexec + +exit diff --git a/udf_style1.hpp b/udf_style1.hpp deleted file mode 100644 index 66b3975..0000000 --- a/udf_style1.hpp +++ /dev/null @@ -1,19 +0,0 @@ -#pragma once -#include "./server/libaquery.h" -#include "./server/aggregations.h" - -auto covariances = [](auto x, auto y, auto w) { - static auto xmeans=0.0;static auto ymeans=0.0; static auto cnt=0; - auto reset = [=]() { xmeans=0.0, ymeans=0.0, cnt=0; }; - auto call = [](decltype(x) x, decltype(y) y, decltype(w) w){ - if((cnt < w)) { - xmeans += x; - ymeans += y; - cnt += 1; - } - y = (x - xmeans); - return avg(((x.subvec((x - w), x) - xmeans) * (y.subvec((y - w), y) - ymeans))); - }; - return std::make_pair(reset, call); -}; -