diff --git a/README.md b/README.md index de90bab..fbc1076 100644 --- a/README.md +++ b/README.md @@ -147,8 +147,7 @@ See files in ./tests/ for more examples. ## Execution Engines - AQuery++ supports different execution engines thanks to the decoupled compiler structure. - Hybrid Execution Engine: decouples the query into two parts. The sql-compliant part is executed by an Embedded version of Monetdb and everything else is executed by a post-process module which is generated by AQuery++ Compiler in C++ and then compiled and executed. -- AQuery Execution Engine: executes queries by compiling the query plan to C++ code. Doesn't support joins and udf functions. -- K9 Execution Engine: (discontinued). +- AQuery Library: A set of header based libraries that provide column arithmetic and operations inspired by array programming languages like kdb. This library is used by C++ post-processor code which can significantly reduce the complexity of generated code, reducing compile time while maintaining the best performance. The set of libraries can also be used by UDFs as well as User modules which makes it easier for users to write simple but powerful extensions. # Roadmap - [x] SQL Parser -> AQuery Parser (Front End) @@ -156,14 +155,21 @@ See files in ./tests/ for more examples. - [x] Schema and Data Model - [x] Data acquisition/output from/to csv file - [ ] Execution Engine - - [x] Projections and single-group Aggregations - - [x] Group by Aggregations - - [x] Filters - - [x] Order by - - [x] Assumption - - [x] Flatten - - [x] Join (Hybrid Engine only) - - [ ] Subqueries + - [x] Single Query + - [x] Projections and single-group Aggregations + - [x] Group by Aggregations + - [x] Filters + - [x] Order by + - [x] Assumption + - [x] Flatten + - [x] Join (Hybrid Engine only) + - [ ] Subquery + - [ ] With Clause + - [ ] From subquery + - [ ] Select sunquery + - [ ] Where subquery + - [ ] Subquery in group by + - [ ] Subquery in order by - [x] Query Optimization - [x] Selection/Order by push-down - [x] Join Optimization (Only in Hybrid Engine) diff --git a/aquery_parser/sql_parser.py b/aquery_parser/sql_parser.py index 9c08db6..5308c2a 100644 --- a/aquery_parser/sql_parser.py +++ b/aquery_parser/sql_parser.py @@ -8,6 +8,7 @@ # from sre_parse import WHITESPACE + from mo_parsing.helpers import restOfLine from mo_parsing.infix import delimited_list from mo_parsing.whitespaces import NO_WHITESPACE, Whitespace @@ -655,7 +656,8 @@ def parser(literal_string, ident, sqlserver=False): ) / to_json_call load_data = ( - keyword("data") ("file_type") + Optional(keyword("complex")("complex")) + + keyword("data") ("file_type") + keyword("infile")("loc") + literal_string ("file") + INTO @@ -667,6 +669,12 @@ def parser(literal_string, ident, sqlserver=False): + keyword("by").suppress() + literal_string ("term") ) + + Optional( + keyword("element").suppress() + + keyword("terminated").suppress() + + keyword("by").suppress() + + literal_string ("ele") + ) ) module_func_def = ( diff --git a/csv.h b/csv.h index c5cb5bc..c0d1762 100644 --- a/csv.h +++ b/csv.h @@ -1,4 +1,4 @@ -// Copyright: (2012-2015) Ben Strasser +// Copyright: (2012-2015) Ben Strasser , 2022 Bill Sun // License: BSD-3 // // All rights reserved. @@ -49,6 +49,7 @@ #include #include #include +#include "server/vector_type.hpp" namespace io{ //////////////////////////////////////////////////////////////////////////// @@ -974,8 +975,7 @@ namespace io{ return; } x = 10*x+y; - }else - throw error::no_digit(); + } ++col; } } @@ -1005,8 +1005,7 @@ namespace io{ return; } x = 10*x-y; - }else - throw error::no_digit(); + } ++col; } return; @@ -1080,19 +1079,37 @@ namespace io{ } x *= base; } - }else{ - if(*col != '\0') - throw error::no_digit(); } if(is_neg) x = -x; } + template void parse(char*col, float&x) { parse_float(col, x); } template void parse(char*col, double&x) { parse_float(col, x); } template void parse(char*col, long double&x) { parse_float(col, x); } - + + + template + void parse_vector(char* col, vector_type& x) { + while (*col != '\0') { + char* next_col = col; + while (*next_col != sep2 && *next_col != '\0') + ++next_col; + while (*next_col == ' ' || *next_col == '\t' || + *next_col == sep2 || *next_col == '\r' || + *next_col == '\n') + ++next_col; + char _next_end = *next_col; + *next_col = '\0'; + T y; + ::io::detail::parse(col, y); + x.emplace_back(y); + col = next_col; + *next_col = _next_end; + } + } template void parse(char*col, T&x){ // Mute unused variable compiler warning @@ -1108,6 +1125,7 @@ namespace io{ } template, class quote_policy = no_quote_escape<','>, class overflow_policy = throw_on_overflow, @@ -1234,7 +1252,23 @@ namespace io{ parse_helper(r+1, cols...); } - + template + void parse_helper(std::size_t r, vector_type&t, ColType&...cols){ + if(row[r]){ + try{ + try{ + ::io::detail::parse_vector(row[r], t); + }catch(error::with_column_content&err){ + err.set_column_content(row[r]); + throw; + } + }catch(error::with_column_name&err){ + err.set_column_name(column_names[r].c_str()); + throw; + } + } + parse_helper(r+1, cols...); + } public: template bool read_row(ColType& ...cols){ @@ -1269,5 +1303,12 @@ namespace io{ } }; } + +template +using AQCSVReader = io::CSVReader, io::no_quote_escape, + io::ignore_overflow, io::empty_line_comment + >; + #endif diff --git a/engine/types.py b/engine/types.py index 8eac736..5a56e12 100644 --- a/engine/types.py +++ b/engine/types.py @@ -1,8 +1,9 @@ from copy import deepcopy -from engine.utils import base62uuid, defval -from aquery_config import have_hge from typing import Dict, List +from aquery_config import have_hge +from engine.utils import base62uuid, defval + type_table: Dict[str, "Types"] = {} class Types: @@ -65,10 +66,10 @@ class Types: return self.sqlname @staticmethod - def decode(aquery_type : str, vector_type:str = 'ColRef') -> "Types": - if (aquery_type.startswith('vec')): + def decode(aquery_type : str, vector_type:str = 'vector_type') -> "Types": + if (aquery_type.lower().startswith('vec')): return VectorT(Types.decode(aquery_type[3:]), vector_type) - return type_table[aquery_type] + return type_table[aquery_type.lower()] class TypeCollection: def __init__(self, sz, deftype, fptype = None, utype = None, *, collection = None) -> None: @@ -121,7 +122,7 @@ class VectorT(Types): return f'{self.vector_type}<{self.inner_type.name}>' @property def sqlname(self) -> str: - return 'BIGINT' + return 'HUGEINT' # Store vector_type into 16 bit integers @property def cname(self) -> str: return f'{self.vector_type}<{self.inner_type.cname}>' @@ -142,7 +143,7 @@ fp_types : Dict[str, Types] = _ty_make_dict('t.sqlname.lower()', FloatT, DoubleT temporal_types : Dict[str, Types] = _ty_make_dict('t.sqlname.lower()', DateT, TimeT, TimeStampT) builtin_types : Dict[str, Types] = { 'string' : StrT, - **_ty_make_dict('t.sqlname.lower()', AnyT, TextT, VarcharT), + **_ty_make_dict('t.sqlname.lower()', AnyT, TextT, VarcharT, HgeT), **int_types, **fp_types, **temporal_types} def get_int128_support(): @@ -365,3 +366,5 @@ user_module_func = {} builtin_operators : Dict[str, OperatorBase] = {**builtin_binary_arith, **builtin_binary_logical, **builtin_unary_arith, **builtin_unary_logical, **builtin_unary_special, **builtin_func, **builtin_cstdlib, **user_module_func} + +type_table = {**builtin_types, **type_table} \ No newline at end of file diff --git a/engine/utils.py b/engine/utils.py index 065f8c8..dc7f2bc 100644 --- a/engine/utils.py +++ b/engine/utils.py @@ -1,6 +1,6 @@ -from collections import OrderedDict -from collections.abc import MutableMapping, Mapping import uuid +from collections import OrderedDict +from collections.abc import Mapping, MutableMapping lower_alp = 'abcdefghijklmnopqrstuvwxyz' upper_alp = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' @@ -107,6 +107,8 @@ def defval(val, default): # escape must be readonly from typing import Mapping, Set + + def remove_last(pattern : str, string : str, escape : Set[str] = set()) -> str: idx = string.rfind(pattern) if idx == -1: @@ -126,9 +128,11 @@ class _Counter: return cnt import re + ws = re.compile(r'\s+') import os + def add_dll_dir(dll: str): import sys if sys.version_info.major >= 3 and sys.version_info.minor >7 and os.name == 'nt': diff --git a/prompt.py b/prompt.py index cd17360..c6a00dd 100644 --- a/prompt.py +++ b/prompt.py @@ -1,4 +1,5 @@ import aquery_config + help_message = '''\ ====================================================== AQUERY COMMANDLINE HELP @@ -82,31 +83,31 @@ if __name__ == '__main__': -import os -from dataclasses import dataclass +import atexit +import ctypes import enum -import time +import mmap +import os # import dbconn import re +import subprocess +import sys +import threading +import time +from dataclasses import dataclass from typing import Callable, List, Optional + +import numpy as np from mo_parsing import ParseException + import aquery_parser as parser import engine -import engine.projection import engine.ddl +import engine.projection import reconstruct as xengine -import subprocess -import mmap -import sys -from engine.utils import base62uuid -import atexit -import threading -import ctypes -import numpy as np -from engine.utils import ws -from engine.utils import add_dll_dir -from engine.utils import nullstream from build import build_manager +from engine.utils import add_dll_dir, base62uuid, nullstream, ws + ## CLASSES BEGIN class RunType(enum.Enum): @@ -407,7 +408,7 @@ def prompt(running = lambda:True, next = lambda:input('> '), state = None): for t in cxt.tables: lst_cols = [] for c in t.columns: - lst_cols.append(f'{c.name} : {c.type}') + lst_cols.append(f'{c.name} : {c.type.name}') print(f'{t.table_name} ({", ".join(lst_cols)})') continue elif q.startswith('help'): @@ -605,7 +606,8 @@ def prompt(running = lambda:True, next = lambda:input('> '), state = None): print("\nBye.") raise except ValueError as e: - import code, traceback + import code + import traceback __stdin = os.dup(0) raise_exception = True sh = code.InteractiveConsole({**globals(), **locals()}) diff --git a/reconstruct/__init__.py b/reconstruct/__init__.py index fd02f61..97afaba 100644 --- a/reconstruct/__init__.py +++ b/reconstruct/__init__.py @@ -1,4 +1,5 @@ from reconstruct.ast import Context, ast_node + saved_cxt = None def initialize(cxt = None, keep = False): diff --git a/reconstruct/ast.py b/reconstruct/ast.py index f81083e..90615ac 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -1,12 +1,13 @@ from copy import deepcopy from dataclasses import dataclass from enum import Enum, auto -from typing import Set, Tuple, Dict, Union, List, Optional +from typing import Dict, List, Optional, Set, Tuple, Union from engine.types import * -from engine.utils import enlist, base62uuid, base62alp, get_legal_name -from reconstruct.storage import Context, TableInfo, ColRef - +from engine.utils import base62alp, base62uuid, enlist, get_legal_name +from reconstruct.storage import ColRef, Context, TableInfo + + class ast_node: header = [] types = dict() @@ -70,7 +71,11 @@ class projection(ast_node): elif 'select_distinct' in node: p = node['select_distinct'] self.distinct = True - + if 'with' in node: + self.with_clause = projection(self, node['value']) + else: + self.with_clause = None + self.projections = p if type(p) is list else [p] if self.parent is None: self.context.sql_begin() @@ -951,6 +956,9 @@ class load(ast_node): if node['load']['file_type'] == 'module': self.produce = self.produce_module self.module = True + elif 'complex' in node['load']: + self.produce = self.produce_cpp + self.consume = lambda *_: None elif self.context.dialect == 'MonetDB': self.produce = self.produce_monetdb else: @@ -1019,7 +1027,56 @@ class load(ast_node): self.sql = f'{s1} \'{p}\' {s2} ' if 'term' in node: self.sql += f' {s3} \'{node["term"]["literal"]}\'' - + + def produce_cpp(self, node): + self.context.has_dll = True + self.context.headers.add('"csv.h"') + node = node['load'] + self.postproc_fname = 'ld_' + base62uuid(5) + self.context.postproc_begin(self.postproc_fname) + + table:TableInfo = self.context.tables_byname[node['table']] + self.sql = F"SELECT {', '.join([c.name for c in table.columns])} FROM {table.table_name};" + self.emit(self.sql+';\n') + self.context.sql_end() + length_name = 'len_' + base62uuid(6) + self.context.emitc(f'auto {length_name} = server->cnt;') + + out_typenames = [t.type.cname for t in table.columns] + outtable_col_nameslist = ', '.join([f'"{c.name}"' for c in table.columns]) + + self.outtable_col_names = 'names_' + base62uuid(4) + self.context.emitc(f'const char* {self.outtable_col_names}[] = {{{outtable_col_nameslist}}};') + + self.out_table = 'tbl_' + base62uuid(4) + self.context.emitc(f'auto {self.out_table} = new TableInfo<{",".join(out_typenames)}>("{table.table_name}", {self.outtable_col_names});') + for i, c in enumerate(table.columns): + c.cxt_name = 'c_' + base62uuid(6) + self.context.emitc(f'decltype(auto) {c.cxt_name} = {self.out_table}->get_col<{i}>();') + self.context.emitc(f'{c.cxt_name}.initfrom({length_name}, server->getCol({i}), "{table.columns[i].name}");') + csv_reader_name = 'csv_reader_' + base62uuid(6) + col_types = [c.type.cname for c in table.columns] + col_tmp_names = ['tmp_'+base62uuid(8) for _ in range(len(table.columns))] + #col_names = ','.join([f'"{c.name}"' for c in table.columns]) + term_field = ',' if 'term' not in node else node['term']['literal'] + term_ele = ';' if 'ele' not in node else node['ele']['literal'] + self.context.emitc(f'AQCSVReader<{len(col_types)}, \'{term_field.strip()[0]}\', \'{term_ele.strip()[0]}\'> {csv_reader_name}("{node["file"]["literal"]}");') + # self.context.emitc(f'{csv_reader_name}.read_header(io::ignore_extra_column, {col_names});') + self.context.emitc(f'{csv_reader_name}.next_line();') + + for t, n in zip(col_types, col_tmp_names): + self.context.emitc(f'{t} {n};') + self.context.emitc(f'while({csv_reader_name}.read_row({",".join(col_tmp_names)})) {{ \n') + for i, c in enumerate(table.columns): + self.context.emitc(f'print({col_tmp_names[i]});') + self.context.emitc(f'{c.cxt_name}.emplace_back({col_tmp_names[i]});') + + self.context.emitc('}') + self.context.emitc(f'print(*{self.out_table});') + self.context.emitc(f'{self.out_table}->monetdb_append_table(cxt->alt_server, "{table.table_name}");') + + self.context.postproc_end(self.postproc_fname) + class outfile(ast_node): name="_outfile" def __init__(self, parent, node, context = None, *, sql = None): @@ -1121,7 +1178,7 @@ class udf(ast_node): def produce(self, node): - from engine.utils import get_legal_name, check_legal_name + from engine.utils import check_legal_name, get_legal_name node = node[self.name] # register udf self.agg = 'Agg' in node @@ -1216,7 +1273,7 @@ class udf(ast_node): def consume(self, node): - from engine.utils import get_legal_name, check_legal_name + from engine.utils import check_legal_name, get_legal_name node = node[self.name] if 'params' in node: @@ -1339,4 +1396,5 @@ def include(objs): import sys + include(sys.modules[__name__]) diff --git a/reconstruct/expr.py b/reconstruct/expr.py index 4fd483b..f1e3d5a 100644 --- a/reconstruct/expr.py +++ b/reconstruct/expr.py @@ -1,7 +1,8 @@ from typing import Optional, Set + +from engine.types import * from reconstruct.ast import ast_node from reconstruct.storage import ColRef, Context -from engine.types import * # TODO: Decouple expr and upgrade architecture # C_CODE : get ccode/sql code? @@ -31,6 +32,7 @@ class expr(ast_node): def __init__(self, parent, node, *, c_code = None, supress_undefined = False): from reconstruct.ast import projection, udf + # gen2 expr have multi-passes # first pass parse json into expr tree # generate target code in later passes upon need @@ -78,7 +80,7 @@ class expr(ast_node): ast_node.__init__(self, parent, node, None) def init(self, _): - from reconstruct.ast import projection, _tmp_join_union + from reconstruct.ast import _tmp_join_union, projection parent = self.parent self.is_compound = parent.is_compound if type(parent) is expr else False if type(parent) in [projection, expr, _tmp_join_union]: diff --git a/reconstruct/storage.py b/reconstruct/storage.py index d54db52..2873747 100644 --- a/reconstruct/storage.py +++ b/reconstruct/storage.py @@ -1,12 +1,14 @@ +from typing import Dict, List, Set + from engine.types import * from engine.utils import CaseInsensitiveDict, base62uuid, enlist -from typing import List, Dict, Set + class ColRef: def __init__(self, _ty, cobj, table:'TableInfo', name, id, compound = False, _ty_args = None): self.type : Types = AnyT if type(_ty) is str: - self.type = builtin_types[_ty.lower()] + self.type = Types.decode(_ty) if _ty_args: self.type = self.type(enlist(_ty_args)) elif type(_ty) is Types: @@ -17,6 +19,7 @@ class ColRef: self.alias = set() self.id = id # position in table self.compound = compound # compound field (list as a field) + self.cxt_name = '' # e.g. order by, group by, filter by expressions self.__arr__ = (_ty, cobj, table, name, id) diff --git a/server/vector_type.hpp b/server/vector_type.hpp index 9b03e89..f0d4cc6 100644 --- a/server/vector_type.hpp +++ b/server/vector_type.hpp @@ -159,6 +159,10 @@ public: grow(); container[size++] = _val; } + void emplace_back(_Ty& _val) { + grow(); + container[size++] = std::move(_val); + } void emplace_back(_Ty&& _val) { grow(); container[size++] = std::move(_val); diff --git a/tests/complex_data.a b/tests/complex_data.a new file mode 100644 index 0000000..e08da4b --- /dev/null +++ b/tests/complex_data.a @@ -0,0 +1,3 @@ +create table f (a float, b vecfloat, c int) +load complex data infile 'data/test_complex.csv' into table f fields terminated by ',' element terminated by ';' +select * from f \ No newline at end of file diff --git a/tests/q4.a b/tests/q4.a index 4237b16..d38a246 100644 --- a/tests/q4.a +++ b/tests/q4.a @@ -17,4 +17,10 @@ LOAD DATA INFILE "data/ticks.csv" INTO TABLE TICKS FIELDS TERMINATED BY "," SELECT max(endofdayprice/prev(endofdayprice)) as Max_Ratio FROM ticks ASSUMING ASC date -WHERE ID = "3001" \ No newline at end of file +WHERE ID = "3001" + +CREATE TABLE ticks2(ID VARCHAR(20), max REAL, min REAL) +INSERT INTO ticks2 SELECT ID AS ID, max(ratios(endofdayprice)) AS max, min(ratios(endofdayprice)) AS min from ticks group by ID; + +SELECT ID, max, min +FROM ticks2; \ No newline at end of file