From 7c5440c4fb2a7539d40503b089b7694c4f3cbcba Mon Sep 17 00:00:00 2001 From: Bill Date: Fri, 24 Mar 2023 04:57:25 +0800 Subject: [PATCH] make ext_engine: duckdb to work --- Makefile | 14 +++--- aquery_config.py | 4 +- build.py | 13 +++--- common/ast.py | 4 ++ common/types.py | 4 +- demo/action.cpp | 2 +- demo/putdata.cpp | 2 +- demo/query.cpp | 2 +- {reconstruct => engine}/TODO.md | 0 {reconstruct => engine}/__init__.py | 2 +- {reconstruct => engine}/ast.py | 32 ++++++++++---- {reconstruct => engine}/expr.py | 12 ++--- {reconstruct => engine}/new_expr.py | 6 +-- {reconstruct => engine}/storage.py | 16 ++++--- mem_opt.cpp | 2 +- msc-plugin/libaquery.vcxproj | 1 + prompt.py | 68 ++++++++++++++++++++++++----- server/DataSource_conn.h | 30 +++++++++---- server/duckdb_conn.cpp | 4 +- server/duckdb_conn.h | 2 +- server/libaquery.cpp | 2 + server/libaquery.h | 19 ++++++-- server/mariadb_conn.cpp | 6 +-- server/mariadb_conn.h | 4 +- server/monetdb_conn.cpp | 29 ++++++------ server/monetdb_conn.h | 30 ++++--------- server/server.cpp | 42 +++++++++++++----- server/table_ext_monetdb.hpp | 2 +- server/vector_type.hpp | 7 +-- 29 files changed, 237 insertions(+), 124 deletions(-) rename {reconstruct => engine}/TODO.md (100%) rename {reconstruct => engine}/__init__.py (91%) rename {reconstruct => engine}/ast.py (96%) rename {reconstruct => engine}/expr.py (96%) rename {reconstruct => engine}/new_expr.py (96%) rename {reconstruct => engine}/storage.py (92%) diff --git a/Makefile b/Makefile index 6b0e14f..a1b1a13 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ Defines = CC = $(CXX) -xc CXXFLAGS = --std=c++2a ifeq ($(AQ_DEBUG), 1) - OPTFLAGS = -g3 #-static-libasan -fsanitize=address + OPTFLAGS = -g3 #-static-libsan -fsanitize=address LINKFLAGS = else OPTFLAGS = -Ofast -DNDEBUG -fno-stack-protector @@ -17,12 +17,15 @@ _COMPILER = $(shell $(CXX) --version | grep -q clang && echo clang|| echo gcc) COMPILER = $(strip $(_COMPILER)) LIBTOOL = ar rcs USELIB_FLAG = -Wl,--whole-archive,libaquery.a -Wl,-no-whole-archive -LIBAQ_SRC = server/monetdb_conn.cpp server/libaquery.cpp -LIBAQ_OBJ = monetdb_conn.o libaquery.o monetdb_ext.o +LIBAQ_SRC = server/monetdb_conn.cpp server/duckdb_conn.cpp server/libaquery.cpp +LIBAQ_OBJ = monetdb_conn.o duckdb_conn.o libaquery.o monetdb_ext.o SEMANTIC_INTERPOSITION = -fno-semantic-interposition RANLIB = ranlib _LINKER_BINARY = $(shell `$(CXX) -print-prog-name=ld` -v 2>&1 | grep -q LLVM && echo lld || echo ld) LINKER_BINARY = $(strip $(_LINKER_BINARY)) +DuckDB_LIB = -Ldeps -lduckdb +DuckDB_INC = -Ideps + ifeq ($(LINKER_BINARY), ld) LINKER_FLAGS = -Wl,--allow-multiple-definition else @@ -58,6 +61,7 @@ ifeq ($(OS),Windows_NT) LIBAQ_OBJ += winhelper.o MonetDB_LIB += msc-plugin/monetdbe.dll MonetDB_INC += -Imonetdb/msvc + LIBTOOL = gcc-ar rcs ifeq ($(COMPILER), clang) FPIC = @@ -96,8 +100,8 @@ ifeq ($(AQUERY_ITC_USE_SEMPH), 1) Defines += -D__AQUERY_ITC_USE_SEMPH__ endif -CXXFLAGS += $(OPTFLAGS) $(Defines) $(MonetDB_INC) -BINARYFLAGS = $(CXXFLAGS) $(LINKFLAGS) $(MonetDB_LIB) +CXXFLAGS += $(OPTFLAGS) $(Defines) $(MonetDB_INC) $(DuckDB_INC) +BINARYFLAGS = $(CXXFLAGS) $(LINKFLAGS) $(MonetDB_LIB) $(DuckDB_LIB) SHAREDFLAGS += $(FPIC) $(BINARYFLAGS) info: diff --git a/aquery_config.py b/aquery_config.py index 87c2062..fe6b3b5 100644 --- a/aquery_config.py +++ b/aquery_config.py @@ -2,7 +2,7 @@ ## GLOBAL CONFIGURATION FLAGS -version_string = '0.7.5a' +version_string = '0.7.6a' add_path_to_ldpath = True rebuild_backend = False run_backend = True @@ -12,6 +12,8 @@ msbuildroot = '' os_platform = 'unknown' build_driver = 'Auto' compilation_output = True +compile_use_gc = True +compile_use_threading = True ## END GLOBAL CONFIGURATION FLAGS diff --git a/build.py b/build.py index d03921c..bdd81b8 100644 --- a/build.py +++ b/build.py @@ -75,14 +75,15 @@ class build_manager: sourcefiles = [ 'build.py', 'Makefile', 'server/server.cpp', 'server/libaquery.cpp', - 'server/monetdb_conn.cpp', 'server/threading.cpp', - 'server/winhelper.cpp', 'server/monetdb_ext.c' + 'server/monetdb_conn.cpp', 'server/duckdb_conn.cpp', + 'server/threading.cpp', 'server/winhelper.cpp', + 'server/monetdb_ext.c' ] headerfiles = ['server/aggregations.h', 'server/hasher.h', 'server/io.h', - 'server/libaquery.h', 'server/monetdb_conn.h', 'server/pch.hpp', - 'server/table.h', 'server/threading.h', 'server/types.h', 'server/utils.h', - 'server/winhelper.h', 'server/gc.h', 'server/vector_type.hpp', - 'server/table_ext_monetdb.hpp' + 'server/libaquery.h', 'server/monetdb_conn.h', 'server/duckdb_conn.h', + 'server/pch.hpp', 'server/table.h', 'server/threading.h', + 'server/types.h', 'server/utils.h', 'server/winhelper.h', + 'server/gc.h', 'server/vector_type.hpp', 'server/table_ext_monetdb.hpp' ] class DriverBase: diff --git a/common/ast.py b/common/ast.py index d534b2b..034a2a3 100644 --- a/common/ast.py +++ b/common/ast.py @@ -229,6 +229,7 @@ class Context: self.removing_scan = False def __init__(self): + from prompt import PromptState self.tables:list[TableInfo] = [] self.tables_byname = dict() self.ccols_byname = dict() @@ -252,6 +253,9 @@ class Context: self.ds_stack = [] self.scans = [] self.removing_scan = False + self.force_compiled = True + self.system_state: Optional[PromptState] = None + def add_table(self, table_name, cols): tbl = TableInfo(table_name, cols, self) self.tables.append(tbl) diff --git a/common/types.py b/common/types.py index 1ec4b73..0f4e022 100644 --- a/common/types.py +++ b/common/types.py @@ -31,7 +31,7 @@ class Types: self.name = name self.cname = defval(cname, name.lower() + '_t') self.sqlname = defval(sqlname, name.upper()) - self.ctype_name = defval(ctype_name, f'types::{name.upper()}') + self.ctype_name = defval(ctype_name, f'types::A{name.upper()}') self.null_value = defval(null_value, 0) self.cast_to_dict = defval(cast_to, dict()) self.cast_from_dict = defval(cast_from, dict()) @@ -102,7 +102,7 @@ LongT = Types(4, name = 'int64', sqlname = 'BIGINT', fp_type = DoubleT) BoolT = Types(0, name = 'bool', cname='bool', sqlname = 'BOOL', long_type=LongT, fp_type=FloatT) ByteT = Types(1, name = 'int8', sqlname = 'TINYINT', long_type=LongT, fp_type=FloatT) ShortT = Types(2, name = 'int16', sqlname='SMALLINT', long_type=LongT, fp_type=FloatT) -IntT = Types(3, name = 'int', cname = 'int', long_type=LongT, fp_type=FloatT) +IntT = Types(3, name = 'int', cname = 'int', long_type=LongT, ctype_name = 'types::AINT32', fp_type=FloatT) ULongT = Types(8, name = 'uint64', sqlname = 'UINT64', fp_type=DoubleT) UIntT = Types(7, name = 'uint32', sqlname = 'UINT32', long_type=ULongT, fp_type=FloatT) UShortT = Types(6, name = 'uint16', sqlname = 'UINT16', long_type=ULongT, fp_type=FloatT) diff --git a/demo/action.cpp b/demo/action.cpp index a45db2b..e4ef271 100644 --- a/demo/action.cpp +++ b/demo/action.cpp @@ -20,7 +20,7 @@ __AQEXPORT__(int) action(Context* cxt) { if (fit_inc == nullptr) fit_inc = (decltype(fit_inc))(cxt->get_module_function("fit_inc")); - auto server = static_cast(cxt->alt_server); + auto server = static_cast(cxt->alt_server); auto len = uint32_t(monetdbe_get_size(*((void**)server->server), "source")); auto x_1bN = ColRef>(len, monetdbe_get_col(*((void**)(server->server)), "source", 0)); auto y_6uX = ColRef(len, monetdbe_get_col(*((void**)(server->server)), "source", 1)); diff --git a/demo/putdata.cpp b/demo/putdata.cpp index 27403f0..d0aa622 100644 --- a/demo/putdata.cpp +++ b/demo/putdata.cpp @@ -23,7 +23,7 @@ __AQEXPORT__(int) ld(Context* cxt) { else ++cnt; char data_name[] = "data/electricity/electricity "; - auto server = static_cast(cxt->alt_server); + auto server = static_cast(cxt->alt_server); const char* names_fZrv[] = {"x", "y"}; auto tbl_6erF = new TableInfo,int64_t>("source", names_fZrv); decltype(auto) c_31ju0e = tbl_6erF->get_col<0>(); diff --git a/demo/query.cpp b/demo/query.cpp index bc91ba5..157b775 100644 --- a/demo/query.cpp +++ b/demo/query.cpp @@ -16,7 +16,7 @@ __AQEXPORT__(void) __AQ_Init_GC__(Context* cxt) { __AQEXPORT__(int) query(Context* cxt) { using namespace std; using namespace types; - auto server = static_cast(cxt->alt_server); + auto server = static_cast(cxt->alt_server); static uint32_t old_sz = 0; constexpr static uint32_t min_delta = 200; auto newsz = monetdbe_get_size(*(void**) server->server, "source"); diff --git a/reconstruct/TODO.md b/engine/TODO.md similarity index 100% rename from reconstruct/TODO.md rename to engine/TODO.md diff --git a/reconstruct/__init__.py b/engine/__init__.py similarity index 91% rename from reconstruct/__init__.py rename to engine/__init__.py index 4247989..995d1c8 100644 --- a/reconstruct/__init__.py +++ b/engine/__init__.py @@ -1,4 +1,4 @@ -from reconstruct.ast import Context, ast_node +from engine.ast import Context, ast_node saved_cxt = None diff --git a/reconstruct/ast.py b/engine/ast.py similarity index 96% rename from reconstruct/ast.py rename to engine/ast.py index 2600a84..5ebbf8d 100644 --- a/reconstruct/ast.py +++ b/engine/ast.py @@ -6,7 +6,7 @@ from typing import Dict, List, Optional, Set, Tuple, Union from common.types import * from common.utils import (base62alp, base62uuid, enlist, get_innermost, get_legal_name) -from reconstruct.storage import ColRef, Context, TableInfo +from engine.storage import ColRef, Context, TableInfo class ast_node: header = [] @@ -51,7 +51,7 @@ class ast_node: self.emit(self.sql+';\n') self.context.sql_end() -from reconstruct.expr import expr, fastscan +from engine.expr import expr, fastscan class SubqType(Enum): WITH = auto() FROM = auto() @@ -328,7 +328,7 @@ class projection(ast_node): for v, idx in self.var_table.items(): vname = get_legal_name(v) + '_' + base62uuid(3) self.pyname2cname[v] = vname - self.context.emitc(f'auto {vname} = ColRef<{typenames[idx].cname}>({length_name}, server->getCol({idx}));') + self.context.emitc(f'auto {vname} = ColRef<{typenames[idx].cname}>({length_name}, server->getCol({idx}, {typenames[idx].ctype_name}));') vid2cname[idx] = vname # Create table into context out_typenames = [None] * len(proj_map) @@ -463,7 +463,7 @@ class select_into(ast_node): raise Exception('No out_table found.') else: self.context.headers.add('"./server/table_ext_monetdb.hpp"') - self.ccode = f'{self.parent.out_table.contextname_cpp}->monetdb_append_table(cxt->alt_server, \"{node.lower()}\");' + self.ccode = f'{self.parent.out_table.contextname_cpp}->monetdb_append_table(cxt->curr_server, \"{node.lower()}\");' def produce_sql(self, node): self.context.sql = self.context.sql.replace( @@ -1252,6 +1252,7 @@ class load(ast_node): name="load" first_order = name def init(self, node): + from prompt import Backend_Type self.module = False if node['load']['file_type'] == 'module': self.produce = self.produce_module @@ -1259,8 +1260,10 @@ class load(ast_node): elif 'complex' in node['load']: self.produce = self.produce_cpp self.consume = lambda *_: None - elif self.context.dialect == 'MonetDB': + elif self.context.system_state.cfg.backend_type == Backend_Type.BACKEND_MonetDB.value: self.produce = self.produce_monetdb + elif self.context.system_state.cfg.backend_type == Backend_Type.BACKEND_DuckDB.value: + self.produce = self.produce_duckdb else: self.produce = self.produce_aq if self.parent is None: @@ -1327,7 +1330,16 @@ class load(ast_node): self.sql = f'{s1} \'{p}\' {s2} ' if 'term' in node: self.sql += f' {s3} \'{node["term"]["literal"]}\'' - + + def produce_duckdb(self, node): + node = node['load'] + s1 = f'COPY {node["table"]} FROM ' + import os + p = os.path.abspath(node['file']['literal']).replace('\\', '/') + s2 = f" DELIMITER '{node['term']['literal']}', " if 'term' in node else '' + self.sql = f'{s1} \'{p}\' ( {s2}HEADER )' + + def produce_cpp(self, node): self.context.has_dll = True self.context.headers.add('"csv.h"') @@ -1374,7 +1386,7 @@ class load(ast_node): self.context.emitc('}') # self.context.emitc(f'print(*{self.out_table});') - self.context.emitc(f'{self.out_table}->monetdb_append_table(cxt->alt_server, "{table.table_name}");') + self.context.emitc(f'{self.out_table}->monetdb_append_table(cxt->curr_server, "{table.table_name}");') self.context.postproc_end(self.postproc_fname) @@ -1424,7 +1436,11 @@ class outfile(ast_node): file_pointer = 'fp_' + base62uuid(6) self.addc(f'FILE* {file_pointer} = fopen("{filename}", "wb");') self.addc(f'{self.parent.out_table.contextname_cpp}->printall("{sep}", "\\n", nullptr, {file_pointer});') - self.addc(f'fclose({file_pointer});') + if self.context.use_gc: + self.addc(f'GC::gc_handle->reg({file_pointer}, 65536, [](void* fp){{fclose((FILE*)fp);}});') + else: + self.addc(f'fclose({file_pointer});') + self.context.ccode += self.ccode class udf(ast_node): diff --git a/reconstruct/expr.py b/engine/expr.py similarity index 96% rename from reconstruct/expr.py rename to engine/expr.py index 9b93dea..ea62d99 100644 --- a/reconstruct/expr.py +++ b/engine/expr.py @@ -1,8 +1,8 @@ from typing import Optional, Set from common.types import * -from reconstruct.ast import ast_node -from reconstruct.storage import ColRef, Context +from engine.ast import ast_node +from engine.storage import ColRef, Context # TODO: Decouple expr and upgrade architecture # C_CODE : get ccode/sql code? @@ -31,7 +31,7 @@ class expr(ast_node): return self._udf_decltypecall is not None def __init__(self, parent, node, *, c_code = None, supress_undefined = False): - from reconstruct.ast import projection, udf + from engine.ast import projection, udf # gen2 expr have multi-passes # first pass parse json into expr tree @@ -80,7 +80,7 @@ class expr(ast_node): ast_node.__init__(self, parent, node, None) def init(self, _): - from reconstruct.ast import _tmp_join_union, projection + from engine.ast import _tmp_join_union, projection parent = self.parent self.is_compound = parent.is_compound if type(parent) is expr else False if type(parent) in [projection, expr, _tmp_join_union]: @@ -96,7 +96,7 @@ class expr(ast_node): def produce(self, node): from common.utils import enlist - from reconstruct.ast import udf, projection + from engine.ast import udf, projection if type(node) is dict: if 'literal' in node: @@ -349,7 +349,7 @@ class expr(ast_node): self.sql = f'{{"CAST({node} AS DOUBLE)" if not c_code else "{node}f"}}' def finalize(self, override = False): - from reconstruct.ast import udf + from engine.ast import udf if self.codebuf is None or override: self.codebuf = '' for c in self.codlets: diff --git a/reconstruct/new_expr.py b/engine/new_expr.py similarity index 96% rename from reconstruct/new_expr.py rename to engine/new_expr.py index 17519af..fe554c1 100644 --- a/reconstruct/new_expr.py +++ b/engine/new_expr.py @@ -1,7 +1,7 @@ import abc -from reconstruct.ast import ast_node +from engine.ast import ast_node from typing import Optional -from reconstruct.storage import Context, ColRef +from engine.storage import Context, ColRef from common.utils import enlist from common.types import builtin_func, user_module_func, builtin_operators @@ -47,7 +47,7 @@ class expr_base(ast_node, metaclass = abc.ABCMeta): pass def produce(self, node): - from reconstruct.ast import udf + from engine.ast import udf if node and type(node) is dict: if 'litral' in node: self.get_literal(node['literal']) diff --git a/reconstruct/storage.py b/engine/storage.py similarity index 92% rename from reconstruct/storage.py rename to engine/storage.py index d6b1164..79113c0 100644 --- a/reconstruct/storage.py +++ b/engine/storage.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Set +from typing import Dict, List, Optional, Set from common.types import * from common.utils import CaseInsensitiveDict, base62uuid, enlist @@ -64,7 +64,7 @@ class ColRef: class TableInfo: def __init__(self, table_name, cols, cxt:'Context'): - from reconstruct.ast import create_trigger + from engine.ast import create_trigger # statics self.table_name : str = table_name self.contextname_cpp : str = '' @@ -161,8 +161,10 @@ class Context: self.has_dll = False self.triggers_active.clear() - def __init__(self): + def __init__(self, state = None): + from prompt import PromptState from .ast import create_trigger + from aquery_config import compile_use_gc self.tables_byname : Dict[str, TableInfo] = dict() self.col_byname = dict() self.tables : Set[TableInfo] = set() @@ -181,6 +183,10 @@ class Context: self.triggers : Dict[str, create_trigger] = dict() self.triggers_active = set() self.stored_proceudres = dict() + self.force_compiled = False + self.use_gc = compile_use_gc + self.system_state: Optional[PromptState] = state + # self.new() called everytime new query batch is started def get_scan_var(self): @@ -206,7 +212,7 @@ class Context: function_head = ('(Context* cxt) {\n' + '\tusing namespace std;\n' + '\tusing namespace types;\n' + - '\tauto server = static_cast(cxt->alt_server);\n') + '\tauto server = static_cast(cxt->curr_server);\n') udf_head = ('#pragma once\n' '#include \"./server/libaquery.h\"\n' @@ -265,7 +271,7 @@ class Context: 'O' + limit + sep + end) def remove_trigger(self, name : str): - from reconstruct.ast import create_trigger + from engine.ast import create_trigger val = self.triggers.pop(name, None) if val.type == create_trigger.Type.Callback: val.table.triggers.remove(val) diff --git a/mem_opt.cpp b/mem_opt.cpp index 452fedd..ce4c277 100644 --- a/mem_opt.cpp +++ b/mem_opt.cpp @@ -17,7 +17,7 @@ __AQEXPORT__(void) __AQ_Init_GC__(Context* cxt) { __AQEXPORT__(int) dll_2Cxoox(Context* cxt) { using namespace std; using namespace types; - auto server = static_cast(cxt->alt_server); + auto server = static_cast(cxt->alt_server); auto len_4ycjiV = server->cnt; auto mont_8AE = ColRef(len_4ycjiV, server->getCol(0)); auto sales_2RB = ColRef(len_4ycjiV, server->getCol(1)); diff --git a/msc-plugin/libaquery.vcxproj b/msc-plugin/libaquery.vcxproj index ee0fd5f..f2a7de2 100644 --- a/msc-plugin/libaquery.vcxproj +++ b/msc-plugin/libaquery.vcxproj @@ -345,6 +345,7 @@ + diff --git a/prompt.py b/prompt.py index 741a589..c7810d6 100644 --- a/prompt.py +++ b/prompt.py @@ -80,7 +80,6 @@ if __name__ == '__main__': if check_param(['-h', '--help'], True): print(help_message) exit() - import atexit @@ -95,7 +94,7 @@ import sys import threading import time from dataclasses import dataclass -from typing import Callable, List, Optional +from typing import Callable, List, Optional, Union import numpy as np from mo_parsing import ParseException @@ -104,10 +103,10 @@ import aquery_parser as parser import common import common.ddl import common.projection -import reconstruct as xengine +import engine as xengine from build import build_manager from common.utils import add_dll_dir, base62uuid, nullstream, ws - +from enum import auto ## CLASSES BEGIN class RunType(enum.Enum): @@ -115,9 +114,19 @@ class RunType(enum.Enum): IPC = 1 class Backend_Type(enum.Enum): - BACKEND_AQuery = 0 - BACKEND_MonetDB = 1 - BACKEND_MariaDB = 2 + BACKEND_AQuery = 0 + BACKEND_MonetDB = 1 + BACKEND_MariaDB = 2 + BACKEND_DuckDB = 3 + BACKEND_SQLite = 4 + BACKEND_TOTAL = 5 +backend_strings = { + 'aquery': Backend_Type.BACKEND_AQuery, + 'monetdb': Backend_Type.BACKEND_MonetDB, + 'mariadb': Backend_Type.BACKEND_MariaDB, + 'duckdb': Backend_Type.BACKEND_DuckDB, + 'sqlite': Backend_Type.BACKEND_SQLite, +} class StoredProcedure(ctypes.Structure): _fields_ = [ @@ -242,7 +251,7 @@ class PromptState(): set_ready = lambda: None get_ready = lambda: None server_status = lambda: False - cfg : Config = None + cfg : Optional[Config] = None shm : str = '' server : subprocess.Popen = None basecmd : List[str] = None @@ -257,6 +266,26 @@ class PromptState(): currstats : Optional[QueryStats] = None buildmgr : Optional[build_manager]= None current_procedure : Optional[str] = None + _force_compiled : bool = False + _cxt : Optional[Union[xengine.Context, common.Context]] = None + @property + def force_compiled(self): + return self._force_compiled + + @force_compiled.setter + def force_compiled(self, new_val): + self.cxt.force_compiled = new_val + self._force_compiled = new_val + + @property + def cxt(self): + return self._cxt + + @cxt.setter + def cxt(self, cxt): + cxt.force_compiled = self.force_compiled + self._cxt = cxt + self._cxt.system_state = self ## CLASSES END ## FUNCTIONS BEGIN @@ -412,7 +441,8 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr q = '' payload = None keep = True - cxt = common.initialize() + + state.cxt = cxt = xengine.initialize() parser.parse('SELECT "**** WELCOME TO AQUERY++! ****";') # state.currstats = QueryStats() @@ -442,7 +472,7 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr continue if False and q == 'exec': # generate build and run (AQuery Engine) state.cfg.backend_type = Backend_Type.BACKEND_AQuery.value - cxt = common.exec(state.stmts, cxt, keep) + state.cxt = cxt = common.exec(state.stmts, cxt, keep) if state.buildmgr.build_dll() == 0: state.set_ready() continue @@ -466,8 +496,8 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr print(prompt_help) continue elif q.startswith('xexec') or q.startswith('exec'): # generate build and run (MonetDB Engine) - state.cfg.backend_type = Backend_Type.BACKEND_MonetDB.value - cxt = xengine.exec(state.stmts, cxt, keep, parser.parse) + #state.cfg.backend_type = Backend_Type.BACKEND_MonetDB.value + state.cxt = cxt = xengine.exec(state.stmts, cxt, keep, parser.parse) this_udf = cxt.finalize_udf() if this_udf: @@ -659,6 +689,20 @@ def prompt(running = lambda:True, next = lambda:input('> '), state : Optional[Pr else: print(procedure_help) continue + elif q.startswith('force'): + splits = q.split() + if len(splits > 1) and splits[1] == 'compiled': + state.force_compiled = True + cxt.force_compiled = True + continue + elif q.startswith('backend'): + splits = q.split() + if len(splits) > 1 and splits[1] in backend_strings: + state.cfg.backend_type = backend_strings[splits[1]].value + else: + cxt.Error('Not a valid backend type.') + print('External Engine is set to', Backend_Type(state.cfg.backend_type).name) + continue trimed = ws.sub(' ', og_q).split(' ') if len(trimed) > 1 and trimed[0].lower().startswith('fi') or trimed[0].lower() == 'f': fn = 'stock.a' if len(trimed) <= 1 or len(trimed[1]) == 0 \ diff --git a/server/DataSource_conn.h b/server/DataSource_conn.h index ff66606..93ec233 100644 --- a/server/DataSource_conn.h +++ b/server/DataSource_conn.h @@ -2,24 +2,32 @@ #define __DATASOURCE_CONN_H__ struct Context; +#ifndef __AQQueryResult__ +#define __AQQueryResult__ 1 struct AQQueryResult { void* res; unsigned ref; }; -enum DataSourceType { - Invalid, - MonetDB, - MariaDB, - DuckDB, - SQLite +#endif + +#ifndef __AQBACKEND_TYPE__ +#define __AQBACKEND_TYPE__ 1 +enum Backend_Type { + BACKEND_AQuery, + BACKEND_MonetDB, + BACKEND_MariaDB, + BACKEND_DuckDB, + BACKEND_SQLite, + BACKEND_TOTAL }; +#endif struct DataSource { void* server = nullptr; Context* cxt = nullptr; bool status = false; char* query = nullptr; - DataSourceType type = Invalid; + Backend_Type DataSourceType = BACKEND_AQuery; void* res = nullptr; void* ret_col = nullptr; @@ -29,7 +37,7 @@ struct DataSource { void* handle; DataSource() = default; - explicit DataSource(Context* cxt = nullptr) = delete; + explicit DataSource(Context* cxt) = delete; virtual void connect(Context* cxt) = 0; virtual void exec(const char* q) = 0; @@ -38,6 +46,10 @@ struct DataSource { virtual void close() = 0; virtual bool haserror() = 0; // virtual void print_results(const char* sep = " ", const char* end = "\n"); - virtual ~DataSource() = 0; + virtual ~DataSource() {}; }; +// TODO: replace with super class +//typedef DataSource* (*create_server_t)(Context* cxt); +typedef void* (*create_server_t)(Context* cxt); +void* CreateNULLServer(Context*); #endif //__DATASOURCE_CONN_H__ \ No newline at end of file diff --git a/server/duckdb_conn.cpp b/server/duckdb_conn.cpp index b395340..a4eb2ce 100644 --- a/server/duckdb_conn.cpp +++ b/server/duckdb_conn.cpp @@ -11,7 +11,8 @@ void DuckdbServer::connect(Context* cxt) { static_cast(malloc(sizeof(duckdb_database))); this->handle = db_handle; bool status = duckdb_open(nullptr, db_handle); - duckdb_connection* conn_handle; + duckdb_connection* conn_handle = + static_cast(malloc(sizeof(duckdb_connection)));; status = status || duckdb_connect(*db_handle, conn_handle); this->server = conn_handle; if (status != 0) { @@ -20,6 +21,7 @@ void DuckdbServer::connect(Context* cxt) { } DuckdbServer::DuckdbServer(Context* cxt) { + this->DataSourceType = BACKEND_DuckDB; this->cxt = cxt; connect(cxt); } diff --git a/server/duckdb_conn.h b/server/duckdb_conn.h index 96f27c7..4c35ecf 100644 --- a/server/duckdb_conn.h +++ b/server/duckdb_conn.h @@ -3,7 +3,7 @@ #include "DataSource_conn.h" struct DuckdbServer : DataSource { - explicit DuckdbServer(Context* cxt = nullptr); + explicit DuckdbServer(Context* cxt); void connect(Context* cxt); void exec(const char* q); void* getCol(int col_idx, int type); diff --git a/server/libaquery.cpp b/server/libaquery.cpp index 041b7d6..858e921 100644 --- a/server/libaquery.cpp +++ b/server/libaquery.cpp @@ -633,3 +633,5 @@ get_procedure(Context* cxt, const char* name) { }; return res->second; } + +void* CreateNULLServer(Context*) { return nullptr; } diff --git a/server/libaquery.h b/server/libaquery.h index cd3ce40..7ae22a4 100644 --- a/server/libaquery.h +++ b/server/libaquery.h @@ -63,11 +63,17 @@ enum Log_level { LOG_SILENT }; +#ifndef __AQBACKEND_TYPE__ +#define __AQBACKEND_TYPE__ 1 enum Backend_Type { BACKEND_AQuery, BACKEND_MonetDB, - BACKEND_MariaDB + BACKEND_MariaDB, + BACKEND_DuckDB, + BACKEND_SQLite, + BACKEND_TOTAL }; +#endif struct QueryStats{ long long monet_time; @@ -81,10 +87,14 @@ struct Config{ int buffer_sizes[]; }; +#ifndef __AQQueryResult__ +#define __AQQueryResult__ 1 struct AQQueryResult { - void* res; - uint32_t ref; + void* res; + unsigned ref; }; +#endif + struct Session{ struct Statistic{ @@ -114,7 +124,8 @@ struct Context { int n_buffers, *sz_bufs; void **buffers; - void* alt_server = nullptr; + void* curr_server; + void* alt_server[BACKEND_TOTAL] = {nullptr}; Log_level log_level = LOG_INFO; Session current; diff --git a/server/mariadb_conn.cpp b/server/mariadb_conn.cpp index 4e226e4..1630b19 100644 --- a/server/mariadb_conn.cpp +++ b/server/mariadb_conn.cpp @@ -9,7 +9,7 @@ inline size_t my_strlen(const char* str){ return ret; } -void Server::connect( +void MariadbServer::connect( Context* cxt, const char* host, const char* user, const char* passwd, const char* db_name, const unsigned int port, const char* unix_socket, const unsigned long client_flag @@ -35,12 +35,12 @@ void Server::connect( this->status = true; } -void Server::exec(const char*q){ +void MariadbServer::exec(const char*q){ auto res = mysql_real_query(server, q, my_strlen(q)); if(res) printf("Execution Error: %d, %s\n", res, mysql_error(server)); } -void Server::close(){ +void MariadbServer::close(){ if(this->status && this->server){ mysql_close(server); server = 0; diff --git a/server/mariadb_conn.h b/server/mariadb_conn.h index a17e92b..b035486 100644 --- a/server/mariadb_conn.h +++ b/server/mariadb_conn.h @@ -5,7 +5,7 @@ #endif struct Context; -struct Server{ +struct MariadbServer{ MYSQL *server = nullptr; Context *cxt = nullptr; bool status = false; @@ -20,5 +20,5 @@ struct Server{ ); void exec(const char* q); void close(); - ~Server(); + ~MariadbServer(); }; \ No newline at end of file diff --git a/server/monetdb_conn.cpp b/server/monetdb_conn.cpp index e025b3d..2b6c3fd 100644 --- a/server/monetdb_conn.cpp +++ b/server/monetdb_conn.cpp @@ -71,16 +71,17 @@ namespace types{ }; } -Server::Server(Context* cxt){ +MonetdbServer::MonetdbServer(Context* cxt) { + this->DataSourceType = BACKEND_MonetDB; if (cxt){ connect(cxt); } } -void Server::connect(Context *cxt){ +void MonetdbServer::connect(Context *cxt){ auto server = static_cast(this->server); if (cxt){ - cxt->alt_server = this; + cxt->alt_server[DataSourceType] = this; this->cxt = cxt; } else{ @@ -89,7 +90,7 @@ void Server::connect(Context *cxt){ } if (server){ - printf("Error: Server %p already connected. Restart? (Y/n). \n", server); + printf("Error: MonetdbServer %p already connected. Restart? (Y/n). \n", server); char c[50]; std::cin.getline(c, 49); for(int i = 0; i < 50; ++i) { @@ -122,7 +123,7 @@ void Server::connect(Context *cxt){ } } -void Server::exec(const char* q){ +void MonetdbServer::exec(const char* q){ auto server = static_cast(this->server); auto _res = static_cast(this->res); monetdbe_cnt _cnt = 0; @@ -137,7 +138,7 @@ void Server::exec(const char* q){ } } -bool Server::haserror(){ +bool MonetdbServer::haserror(){ if (last_error){ puts(last_error); last_error = nullptr; @@ -149,7 +150,7 @@ bool Server::haserror(){ } -void Server::print_results(const char* sep, const char* end){ +void MonetdbServer::print_results(const char* sep, const char* end){ if (!haserror()){ auto _res = static_cast (res); @@ -190,7 +191,7 @@ void Server::print_results(const char* sep, const char* end){ } } -void Server::close(){ +void MonetdbServer::close(){ if(this->server){ auto server = static_cast(this->server); monetdbe_close(*server); @@ -199,7 +200,7 @@ void Server::close(){ } } -void* Server::getCol(int col_idx){ +void* MonetdbServer::getCol(int col_idx, int){ if(res){ auto _res = static_cast(this->res); auto err_msg = monetdbe_result_fetch(_res, @@ -224,7 +225,7 @@ void* Server::getCol(int col_idx){ #define AQ_MONETDB_FETCH(X) case monetdbe_##X: \ return (long long)((X *)(_ret_col->data))[0]; -long long Server::getFirstElement() { +long long MonetdbServer::getFirstElement() { if(!this->haserror() && res) { auto _res = static_cast(this->res); auto err_msg = monetdbe_result_fetch(_res, @@ -266,11 +267,11 @@ long long Server::getFirstElement() { return 0; } -Server::~Server(){ +MonetdbServer::~MonetdbServer(){ close(); } -bool Server::havehge() { +bool MonetdbServer::havehge() { #if defined(_MONETDBE_LIB_) and defined(HAVE_HGE) // puts("true"); return HAVE_HGE; @@ -299,7 +300,7 @@ constexpr prt_fn_t monetdbe_prtfns[] = { constexpr uint32_t output_buffer_size = 65536; void print_monetdb_results(void* _srv, const char* sep = " ", const char* end = "\n", uint32_t limit = std::numeric_limits::max()) { - auto srv = static_cast(_srv); + auto srv = static_cast(_srv); if (!srv->haserror() && srv->cnt && limit) { char buffer[output_buffer_size]; auto _res = static_cast (srv->res); @@ -360,7 +361,7 @@ cleanup: int ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){ - auto server = static_cast(cxt->alt_server); + auto server = static_cast(cxt->alt_server[BACKEND_MonetDB]); int ret = 0; bool return_from_procedure = false; void* handle = nullptr; diff --git a/server/monetdb_conn.h b/server/monetdb_conn.h index 5a6b228..f614188 100644 --- a/server/monetdb_conn.h +++ b/server/monetdb_conn.h @@ -1,31 +1,19 @@ #ifndef __MONETDB_CONN_H__ #define __MONETDB_CONN_H__ +#include "DataSource_conn.h" -struct Context; - -struct Server{ - void *server = nullptr; - Context *cxt = nullptr; - bool status = false; - char* query = nullptr; - int type = 1; - - void* res = nullptr; - void* ret_col = nullptr; - long long cnt = 0; - char* last_error = nullptr; - - explicit Server(Context* cxt = nullptr); - void connect(Context* cxt); - void exec(const char* q); - void *getCol(int col_idx); +struct MonetdbServer : DataSource { + explicit MonetdbServer(Context* cxt); + void connect(Context* cxt) override; + void exec(const char* q) override; + void *getCol(int col_idx, int) override; long long getFirstElement(); - void close(); - bool haserror(); + void close() override; + bool haserror() override; static bool havehge(); void print_results(const char* sep = " ", const char* end = "\n"); friend void print_monetdb_results(void* _srv, const char* sep, const char* end, int limit); - ~Server(); + ~MonetdbServer() override; }; struct monetdbe_table_data{ diff --git a/server/server.cpp b/server/server.cpp index 608f95b..a8868ba 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -7,6 +7,16 @@ #include "libaquery.h" #include "monetdb_conn.h" +#include "duckdb_conn.h" + +constexpr create_server_t get_server[] = { + CreateNULLServer, + [](Context* cxt) -> void*{ return new MonetdbServer(cxt); }, + CreateNULLServer, + [](Context* cxt) -> void*{ return new DuckdbServer(cxt); }, + CreateNULLServer, +}; + #pragma region misc #ifdef THREADING #include "threading.h" @@ -89,7 +99,7 @@ extern "C" int __DLLEXPORT__ binary_info() { __AQEXPORT__(bool) have_hge() { #if defined(__MONETDB_CONN_H__) - return Server::havehge(); + return MonetdbServer::havehge(); #else return false; #endif @@ -205,13 +215,20 @@ int dll_main(int argc, char** argv, Context* cxt){ cxt->cfg = cfg; cxt->n_buffers = cfg->n_buffers; cxt->sz_bufs = buf_szs; - if (cfg->backend_type == BACKEND_MonetDB && cxt->alt_server == nullptr) - { - auto alt_server = new Server(cxt); - alt_server->exec("SELECT '**** WELCOME TO AQUERY++! ****';"); - puts(*(const char**)(alt_server->getCol(0))); - cxt->alt_server = alt_server; - } + + + const auto& update_backend = [&cxt, &cfg](){ + auto& curr_server = cxt->alt_server[cfg->backend_type]; + if (curr_server == nullptr) { + curr_server = get_server[cfg->backend_type](cxt); + cxt->alt_server[cfg->backend_type] = curr_server; + static_cast(curr_server)->exec("SELECT '**** WELCOME TO AQUERY++! ****';"); + puts(*(const char**)(static_cast(curr_server)->getCol(0, types::Types::getType()))); + } + cxt->curr_server = curr_server; + }; + update_backend(); + while(cfg->running){ ENGINE_ACQUIRE(); if (cfg->new_query) { @@ -221,10 +238,11 @@ start: void *handle = nullptr; void *user_module_handle = nullptr; - if (cfg->backend_type == BACKEND_MonetDB){ - if (cxt->alt_server == nullptr) - cxt->alt_server = new Server(cxt); - Server* server = reinterpret_cast(cxt->alt_server); + if (cfg->backend_type == BACKEND_MonetDB|| + cfg->backend_type == BACKEND_DuckDB + ) { + update_backend(); + auto server = reinterpret_cast(cxt->curr_server); if(n_recv > 0){ if (cfg->backend_type == BACKEND_AQuery || cfg->has_dll) { const char* proc_name = "./dll.so"; diff --git a/server/table_ext_monetdb.hpp b/server/table_ext_monetdb.hpp index f688f08..a2d3a62 100644 --- a/server/table_ext_monetdb.hpp +++ b/server/table_ext_monetdb.hpp @@ -67,7 +67,7 @@ void TableInfo::monetdb_append_table(void* srv, const char* alt_name) { auto last_comma = create_table_str.find_last_of(','); if (last_comma != static_cast(-1)) { create_table_str[last_comma] = ')'; - Server* server = (Server*)srv; + MonetdbServer* server = (MonetdbServer*)srv; // puts("create table..."); // puts(create_table_str.c_str()); server->exec(create_table_str.c_str()); diff --git a/server/vector_type.hpp b/server/vector_type.hpp index c6e2f47..dd5f8b5 100644 --- a/server/vector_type.hpp +++ b/server/vector_type.hpp @@ -169,7 +169,7 @@ public: return distinct_copy(); } // TODO: think of situations where this is a temp!! (copy on write!!!) - template + template inline void grow(uint32_t sz = 0) { if constexpr (_grow) sz = this->size; @@ -192,6 +192,8 @@ public: n_container = (_Ty*)malloc(new_capacity * sizeof(_Ty)); memcpy(n_container, container, sizeof(_Ty) * size); } + if constexpr(_resize) + size = sz; memset(n_container + size, 0, sizeof(_Ty) * (new_capacity - size)); // if (capacity) // free(container); @@ -200,8 +202,7 @@ public: } } inline void resize(const uint32_t sz){ - size = sz; - grow(sz); + grow(sz); } inline void reserve(const uint32_t sz){ grow(sz);