read complex data from csv

dev
Bill 2 years ago
parent 818ab3b2e5
commit 6478deb7da

@ -147,8 +147,7 @@ See files in ./tests/ for more examples.
## Execution Engines ## Execution Engines
- AQuery++ supports different execution engines thanks to the decoupled compiler structure. - AQuery++ supports different execution engines thanks to the decoupled compiler structure.
- Hybrid Execution Engine: decouples the query into two parts. The sql-compliant part is executed by an Embedded version of Monetdb and everything else is executed by a post-process module which is generated by AQuery++ Compiler in C++ and then compiled and executed. - Hybrid Execution Engine: decouples the query into two parts. The sql-compliant part is executed by an Embedded version of Monetdb and everything else is executed by a post-process module which is generated by AQuery++ Compiler in C++ and then compiled and executed.
- AQuery Execution Engine: executes queries by compiling the query plan to C++ code. Doesn't support joins and udf functions. - AQuery Library: A set of header based libraries that provide column arithmetic and operations inspired by array programming languages like kdb. This library is used by C++ post-processor code which can significantly reduce the complexity of generated code, reducing compile time while maintaining the best performance. The set of libraries can also be used by UDFs as well as User modules which makes it easier for users to write simple but powerful extensions.
- K9 Execution Engine: (discontinued).
# Roadmap # Roadmap
- [x] SQL Parser -> AQuery Parser (Front End) - [x] SQL Parser -> AQuery Parser (Front End)
@ -156,14 +155,21 @@ See files in ./tests/ for more examples.
- [x] Schema and Data Model - [x] Schema and Data Model
- [x] Data acquisition/output from/to csv file - [x] Data acquisition/output from/to csv file
- [ ] Execution Engine - [ ] Execution Engine
- [x] Projections and single-group Aggregations - [x] Single Query
- [x] Group by Aggregations - [x] Projections and single-group Aggregations
- [x] Filters - [x] Group by Aggregations
- [x] Order by - [x] Filters
- [x] Assumption - [x] Order by
- [x] Flatten - [x] Assumption
- [x] Join (Hybrid Engine only) - [x] Flatten
- [ ] Subqueries - [x] Join (Hybrid Engine only)
- [ ] Subquery
- [ ] With Clause
- [ ] From subquery
- [ ] Select sunquery
- [ ] Where subquery
- [ ] Subquery in group by
- [ ] Subquery in order by
- [x] Query Optimization - [x] Query Optimization
- [x] Selection/Order by push-down - [x] Selection/Order by push-down
- [x] Join Optimization (Only in Hybrid Engine) - [x] Join Optimization (Only in Hybrid Engine)

@ -8,6 +8,7 @@
# #
from sre_parse import WHITESPACE from sre_parse import WHITESPACE
from mo_parsing.helpers import restOfLine from mo_parsing.helpers import restOfLine
from mo_parsing.infix import delimited_list from mo_parsing.infix import delimited_list
from mo_parsing.whitespaces import NO_WHITESPACE, Whitespace from mo_parsing.whitespaces import NO_WHITESPACE, Whitespace
@ -655,7 +656,8 @@ def parser(literal_string, ident, sqlserver=False):
) / to_json_call ) / to_json_call
load_data = ( load_data = (
keyword("data") ("file_type") Optional(keyword("complex")("complex"))
+ keyword("data") ("file_type")
+ keyword("infile")("loc") + keyword("infile")("loc")
+ literal_string ("file") + literal_string ("file")
+ INTO + INTO
@ -667,6 +669,12 @@ def parser(literal_string, ident, sqlserver=False):
+ keyword("by").suppress() + keyword("by").suppress()
+ literal_string ("term") + literal_string ("term")
) )
+ Optional(
keyword("element").suppress()
+ keyword("terminated").suppress()
+ keyword("by").suppress()
+ literal_string ("ele")
)
) )
module_func_def = ( module_func_def = (

59
csv.h

@ -1,4 +1,4 @@
// Copyright: (2012-2015) Ben Strasser <code@ben-strasser.net> // Copyright: (2012-2015) Ben Strasser <code@ben-strasser.net>, 2022 Bill Sun
// License: BSD-3 // License: BSD-3
// //
// All rights reserved. // All rights reserved.
@ -49,6 +49,7 @@
#include <cerrno> #include <cerrno>
#include <istream> #include <istream>
#include <limits> #include <limits>
#include "server/vector_type.hpp"
namespace io{ namespace io{
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
@ -974,8 +975,7 @@ namespace io{
return; return;
} }
x = 10*x+y; x = 10*x+y;
}else }
throw error::no_digit();
++col; ++col;
} }
} }
@ -1005,8 +1005,7 @@ namespace io{
return; return;
} }
x = 10*x-y; x = 10*x-y;
}else }
throw error::no_digit();
++col; ++col;
} }
return; return;
@ -1080,19 +1079,37 @@ namespace io{
} }
x *= base; x *= base;
} }
}else{
if(*col != '\0')
throw error::no_digit();
} }
if(is_neg) if(is_neg)
x = -x; x = -x;
} }
template<class overflow_policy> void parse(char*col, float&x) { parse_float(col, x); } template<class overflow_policy> void parse(char*col, float&x) { parse_float(col, x); }
template<class overflow_policy> void parse(char*col, double&x) { parse_float(col, x); } template<class overflow_policy> void parse(char*col, double&x) { parse_float(col, x); }
template<class overflow_policy> void parse(char*col, long double&x) { parse_float(col, x); } template<class overflow_policy> void parse(char*col, long double&x) { parse_float(col, x); }
template<class overflow_policy, class T, char sep2 = ';'>
void parse_vector(char* col, vector_type<T>& x) {
while (*col != '\0') {
char* next_col = col;
while (*next_col != sep2 && *next_col != '\0')
++next_col;
while (*next_col == ' ' || *next_col == '\t' ||
*next_col == sep2 || *next_col == '\r' ||
*next_col == '\n')
++next_col;
char _next_end = *next_col;
*next_col = '\0';
T y;
::io::detail::parse<overflow_policy>(col, y);
x.emplace_back(y);
col = next_col;
*next_col = _next_end;
}
}
template<class overflow_policy, class T> template<class overflow_policy, class T>
void parse(char*col, T&x){ void parse(char*col, T&x){
// Mute unused variable compiler warning // Mute unused variable compiler warning
@ -1108,6 +1125,7 @@ namespace io{
} }
template<unsigned column_count, template<unsigned column_count,
char sep2 = -2,
class trim_policy = trim_chars<' ', '\t'>, class trim_policy = trim_chars<' ', '\t'>,
class quote_policy = no_quote_escape<','>, class quote_policy = no_quote_escape<','>,
class overflow_policy = throw_on_overflow, class overflow_policy = throw_on_overflow,
@ -1234,7 +1252,23 @@ namespace io{
parse_helper(r+1, cols...); parse_helper(r+1, cols...);
} }
template<class T, class ...ColType>
void parse_helper(std::size_t r, vector_type<T>&t, ColType&...cols){
if(row[r]){
try{
try{
::io::detail::parse_vector<overflow_policy, T, sep2>(row[r], t);
}catch(error::with_column_content&err){
err.set_column_content(row[r]);
throw;
}
}catch(error::with_column_name&err){
err.set_column_name(column_names[r].c_str());
throw;
}
}
parse_helper(r+1, cols...);
}
public: public:
template<class ...ColType> template<class ...ColType>
bool read_row(ColType& ...cols){ bool read_row(ColType& ...cols){
@ -1269,5 +1303,12 @@ namespace io{
} }
}; };
} }
template <unsigned column_count, char sep1 = ',', char sep2 = ';'>
using AQCSVReader = io::CSVReader<column_count, sep2,
io::trim_chars<(char)32, (char)9>, io::no_quote_escape<sep1>,
io::ignore_overflow, io::empty_line_comment
>;
#endif #endif

@ -1,8 +1,9 @@
from copy import deepcopy from copy import deepcopy
from engine.utils import base62uuid, defval
from aquery_config import have_hge
from typing import Dict, List from typing import Dict, List
from aquery_config import have_hge
from engine.utils import base62uuid, defval
type_table: Dict[str, "Types"] = {} type_table: Dict[str, "Types"] = {}
class Types: class Types:
@ -65,10 +66,10 @@ class Types:
return self.sqlname return self.sqlname
@staticmethod @staticmethod
def decode(aquery_type : str, vector_type:str = 'ColRef') -> "Types": def decode(aquery_type : str, vector_type:str = 'vector_type') -> "Types":
if (aquery_type.startswith('vec')): if (aquery_type.lower().startswith('vec')):
return VectorT(Types.decode(aquery_type[3:]), vector_type) return VectorT(Types.decode(aquery_type[3:]), vector_type)
return type_table[aquery_type] return type_table[aquery_type.lower()]
class TypeCollection: class TypeCollection:
def __init__(self, sz, deftype, fptype = None, utype = None, *, collection = None) -> None: def __init__(self, sz, deftype, fptype = None, utype = None, *, collection = None) -> None:
@ -121,7 +122,7 @@ class VectorT(Types):
return f'{self.vector_type}<{self.inner_type.name}>' return f'{self.vector_type}<{self.inner_type.name}>'
@property @property
def sqlname(self) -> str: def sqlname(self) -> str:
return 'BIGINT' return 'HUGEINT' # Store vector_type into 16 bit integers
@property @property
def cname(self) -> str: def cname(self) -> str:
return f'{self.vector_type}<{self.inner_type.cname}>' return f'{self.vector_type}<{self.inner_type.cname}>'
@ -142,7 +143,7 @@ fp_types : Dict[str, Types] = _ty_make_dict('t.sqlname.lower()', FloatT, DoubleT
temporal_types : Dict[str, Types] = _ty_make_dict('t.sqlname.lower()', DateT, TimeT, TimeStampT) temporal_types : Dict[str, Types] = _ty_make_dict('t.sqlname.lower()', DateT, TimeT, TimeStampT)
builtin_types : Dict[str, Types] = { builtin_types : Dict[str, Types] = {
'string' : StrT, 'string' : StrT,
**_ty_make_dict('t.sqlname.lower()', AnyT, TextT, VarcharT), **_ty_make_dict('t.sqlname.lower()', AnyT, TextT, VarcharT, HgeT),
**int_types, **fp_types, **temporal_types} **int_types, **fp_types, **temporal_types}
def get_int128_support(): def get_int128_support():
@ -365,3 +366,5 @@ user_module_func = {}
builtin_operators : Dict[str, OperatorBase] = {**builtin_binary_arith, **builtin_binary_logical, builtin_operators : Dict[str, OperatorBase] = {**builtin_binary_arith, **builtin_binary_logical,
**builtin_unary_arith, **builtin_unary_logical, **builtin_unary_special, **builtin_func, **builtin_cstdlib, **builtin_unary_arith, **builtin_unary_logical, **builtin_unary_special, **builtin_func, **builtin_cstdlib,
**user_module_func} **user_module_func}
type_table = {**builtin_types, **type_table}

@ -1,6 +1,6 @@
from collections import OrderedDict
from collections.abc import MutableMapping, Mapping
import uuid import uuid
from collections import OrderedDict
from collections.abc import Mapping, MutableMapping
lower_alp = 'abcdefghijklmnopqrstuvwxyz' lower_alp = 'abcdefghijklmnopqrstuvwxyz'
upper_alp = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' upper_alp = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
@ -107,6 +107,8 @@ def defval(val, default):
# escape must be readonly # escape must be readonly
from typing import Mapping, Set from typing import Mapping, Set
def remove_last(pattern : str, string : str, escape : Set[str] = set()) -> str: def remove_last(pattern : str, string : str, escape : Set[str] = set()) -> str:
idx = string.rfind(pattern) idx = string.rfind(pattern)
if idx == -1: if idx == -1:
@ -126,9 +128,11 @@ class _Counter:
return cnt return cnt
import re import re
ws = re.compile(r'\s+') ws = re.compile(r'\s+')
import os import os
def add_dll_dir(dll: str): def add_dll_dir(dll: str):
import sys import sys
if sys.version_info.major >= 3 and sys.version_info.minor >7 and os.name == 'nt': if sys.version_info.major >= 3 and sys.version_info.minor >7 and os.name == 'nt':

@ -1,4 +1,5 @@
import aquery_config import aquery_config
help_message = '''\ help_message = '''\
====================================================== ======================================================
AQUERY COMMANDLINE HELP AQUERY COMMANDLINE HELP
@ -82,31 +83,31 @@ if __name__ == '__main__':
import os import atexit
from dataclasses import dataclass import ctypes
import enum import enum
import time import mmap
import os
# import dbconn # import dbconn
import re import re
import subprocess
import sys
import threading
import time
from dataclasses import dataclass
from typing import Callable, List, Optional from typing import Callable, List, Optional
import numpy as np
from mo_parsing import ParseException from mo_parsing import ParseException
import aquery_parser as parser import aquery_parser as parser
import engine import engine
import engine.projection
import engine.ddl import engine.ddl
import engine.projection
import reconstruct as xengine import reconstruct as xengine
import subprocess
import mmap
import sys
from engine.utils import base62uuid
import atexit
import threading
import ctypes
import numpy as np
from engine.utils import ws
from engine.utils import add_dll_dir
from engine.utils import nullstream
from build import build_manager from build import build_manager
from engine.utils import add_dll_dir, base62uuid, nullstream, ws
## CLASSES BEGIN ## CLASSES BEGIN
class RunType(enum.Enum): class RunType(enum.Enum):
@ -407,7 +408,7 @@ def prompt(running = lambda:True, next = lambda:input('> '), state = None):
for t in cxt.tables: for t in cxt.tables:
lst_cols = [] lst_cols = []
for c in t.columns: for c in t.columns:
lst_cols.append(f'{c.name} : {c.type}') lst_cols.append(f'{c.name} : {c.type.name}')
print(f'{t.table_name} ({", ".join(lst_cols)})') print(f'{t.table_name} ({", ".join(lst_cols)})')
continue continue
elif q.startswith('help'): elif q.startswith('help'):
@ -605,7 +606,8 @@ def prompt(running = lambda:True, next = lambda:input('> '), state = None):
print("\nBye.") print("\nBye.")
raise raise
except ValueError as e: except ValueError as e:
import code, traceback import code
import traceback
__stdin = os.dup(0) __stdin = os.dup(0)
raise_exception = True raise_exception = True
sh = code.InteractiveConsole({**globals(), **locals()}) sh = code.InteractiveConsole({**globals(), **locals()})

@ -1,4 +1,5 @@
from reconstruct.ast import Context, ast_node from reconstruct.ast import Context, ast_node
saved_cxt = None saved_cxt = None
def initialize(cxt = None, keep = False): def initialize(cxt = None, keep = False):

@ -1,11 +1,12 @@
from copy import deepcopy from copy import deepcopy
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum, auto from enum import Enum, auto
from typing import Set, Tuple, Dict, Union, List, Optional from typing import Dict, List, Optional, Set, Tuple, Union
from engine.types import * from engine.types import *
from engine.utils import enlist, base62uuid, base62alp, get_legal_name from engine.utils import base62alp, base62uuid, enlist, get_legal_name
from reconstruct.storage import Context, TableInfo, ColRef from reconstruct.storage import ColRef, Context, TableInfo
class ast_node: class ast_node:
header = [] header = []
@ -70,6 +71,10 @@ class projection(ast_node):
elif 'select_distinct' in node: elif 'select_distinct' in node:
p = node['select_distinct'] p = node['select_distinct']
self.distinct = True self.distinct = True
if 'with' in node:
self.with_clause = projection(self, node['value'])
else:
self.with_clause = None
self.projections = p if type(p) is list else [p] self.projections = p if type(p) is list else [p]
if self.parent is None: if self.parent is None:
@ -951,6 +956,9 @@ class load(ast_node):
if node['load']['file_type'] == 'module': if node['load']['file_type'] == 'module':
self.produce = self.produce_module self.produce = self.produce_module
self.module = True self.module = True
elif 'complex' in node['load']:
self.produce = self.produce_cpp
self.consume = lambda *_: None
elif self.context.dialect == 'MonetDB': elif self.context.dialect == 'MonetDB':
self.produce = self.produce_monetdb self.produce = self.produce_monetdb
else: else:
@ -1020,6 +1028,55 @@ class load(ast_node):
if 'term' in node: if 'term' in node:
self.sql += f' {s3} \'{node["term"]["literal"]}\'' self.sql += f' {s3} \'{node["term"]["literal"]}\''
def produce_cpp(self, node):
self.context.has_dll = True
self.context.headers.add('"csv.h"')
node = node['load']
self.postproc_fname = 'ld_' + base62uuid(5)
self.context.postproc_begin(self.postproc_fname)
table:TableInfo = self.context.tables_byname[node['table']]
self.sql = F"SELECT {', '.join([c.name for c in table.columns])} FROM {table.table_name};"
self.emit(self.sql+';\n')
self.context.sql_end()
length_name = 'len_' + base62uuid(6)
self.context.emitc(f'auto {length_name} = server->cnt;')
out_typenames = [t.type.cname for t in table.columns]
outtable_col_nameslist = ', '.join([f'"{c.name}"' for c in table.columns])
self.outtable_col_names = 'names_' + base62uuid(4)
self.context.emitc(f'const char* {self.outtable_col_names}[] = {{{outtable_col_nameslist}}};')
self.out_table = 'tbl_' + base62uuid(4)
self.context.emitc(f'auto {self.out_table} = new TableInfo<{",".join(out_typenames)}>("{table.table_name}", {self.outtable_col_names});')
for i, c in enumerate(table.columns):
c.cxt_name = 'c_' + base62uuid(6)
self.context.emitc(f'decltype(auto) {c.cxt_name} = {self.out_table}->get_col<{i}>();')
self.context.emitc(f'{c.cxt_name}.initfrom({length_name}, server->getCol({i}), "{table.columns[i].name}");')
csv_reader_name = 'csv_reader_' + base62uuid(6)
col_types = [c.type.cname for c in table.columns]
col_tmp_names = ['tmp_'+base62uuid(8) for _ in range(len(table.columns))]
#col_names = ','.join([f'"{c.name}"' for c in table.columns])
term_field = ',' if 'term' not in node else node['term']['literal']
term_ele = ';' if 'ele' not in node else node['ele']['literal']
self.context.emitc(f'AQCSVReader<{len(col_types)}, \'{term_field.strip()[0]}\', \'{term_ele.strip()[0]}\'> {csv_reader_name}("{node["file"]["literal"]}");')
# self.context.emitc(f'{csv_reader_name}.read_header(io::ignore_extra_column, {col_names});')
self.context.emitc(f'{csv_reader_name}.next_line();')
for t, n in zip(col_types, col_tmp_names):
self.context.emitc(f'{t} {n};')
self.context.emitc(f'while({csv_reader_name}.read_row({",".join(col_tmp_names)})) {{ \n')
for i, c in enumerate(table.columns):
self.context.emitc(f'print({col_tmp_names[i]});')
self.context.emitc(f'{c.cxt_name}.emplace_back({col_tmp_names[i]});')
self.context.emitc('}')
self.context.emitc(f'print(*{self.out_table});')
self.context.emitc(f'{self.out_table}->monetdb_append_table(cxt->alt_server, "{table.table_name}");')
self.context.postproc_end(self.postproc_fname)
class outfile(ast_node): class outfile(ast_node):
name="_outfile" name="_outfile"
def __init__(self, parent, node, context = None, *, sql = None): def __init__(self, parent, node, context = None, *, sql = None):
@ -1121,7 +1178,7 @@ class udf(ast_node):
def produce(self, node): def produce(self, node):
from engine.utils import get_legal_name, check_legal_name from engine.utils import check_legal_name, get_legal_name
node = node[self.name] node = node[self.name]
# register udf # register udf
self.agg = 'Agg' in node self.agg = 'Agg' in node
@ -1216,7 +1273,7 @@ class udf(ast_node):
def consume(self, node): def consume(self, node):
from engine.utils import get_legal_name, check_legal_name from engine.utils import check_legal_name, get_legal_name
node = node[self.name] node = node[self.name]
if 'params' in node: if 'params' in node:
@ -1339,4 +1396,5 @@ def include(objs):
import sys import sys
include(sys.modules[__name__]) include(sys.modules[__name__])

@ -1,7 +1,8 @@
from typing import Optional, Set from typing import Optional, Set
from engine.types import *
from reconstruct.ast import ast_node from reconstruct.ast import ast_node
from reconstruct.storage import ColRef, Context from reconstruct.storage import ColRef, Context
from engine.types import *
# TODO: Decouple expr and upgrade architecture # TODO: Decouple expr and upgrade architecture
# C_CODE : get ccode/sql code? # C_CODE : get ccode/sql code?
@ -31,6 +32,7 @@ class expr(ast_node):
def __init__(self, parent, node, *, c_code = None, supress_undefined = False): def __init__(self, parent, node, *, c_code = None, supress_undefined = False):
from reconstruct.ast import projection, udf from reconstruct.ast import projection, udf
# gen2 expr have multi-passes # gen2 expr have multi-passes
# first pass parse json into expr tree # first pass parse json into expr tree
# generate target code in later passes upon need # generate target code in later passes upon need
@ -78,7 +80,7 @@ class expr(ast_node):
ast_node.__init__(self, parent, node, None) ast_node.__init__(self, parent, node, None)
def init(self, _): def init(self, _):
from reconstruct.ast import projection, _tmp_join_union from reconstruct.ast import _tmp_join_union, projection
parent = self.parent parent = self.parent
self.is_compound = parent.is_compound if type(parent) is expr else False self.is_compound = parent.is_compound if type(parent) is expr else False
if type(parent) in [projection, expr, _tmp_join_union]: if type(parent) in [projection, expr, _tmp_join_union]:

@ -1,12 +1,14 @@
from typing import Dict, List, Set
from engine.types import * from engine.types import *
from engine.utils import CaseInsensitiveDict, base62uuid, enlist from engine.utils import CaseInsensitiveDict, base62uuid, enlist
from typing import List, Dict, Set
class ColRef: class ColRef:
def __init__(self, _ty, cobj, table:'TableInfo', name, id, compound = False, _ty_args = None): def __init__(self, _ty, cobj, table:'TableInfo', name, id, compound = False, _ty_args = None):
self.type : Types = AnyT self.type : Types = AnyT
if type(_ty) is str: if type(_ty) is str:
self.type = builtin_types[_ty.lower()] self.type = Types.decode(_ty)
if _ty_args: if _ty_args:
self.type = self.type(enlist(_ty_args)) self.type = self.type(enlist(_ty_args))
elif type(_ty) is Types: elif type(_ty) is Types:
@ -17,6 +19,7 @@ class ColRef:
self.alias = set() self.alias = set()
self.id = id # position in table self.id = id # position in table
self.compound = compound # compound field (list as a field) self.compound = compound # compound field (list as a field)
self.cxt_name = ''
# e.g. order by, group by, filter by expressions # e.g. order by, group by, filter by expressions
self.__arr__ = (_ty, cobj, table, name, id) self.__arr__ = (_ty, cobj, table, name, id)

@ -159,6 +159,10 @@ public:
grow(); grow();
container[size++] = _val; container[size++] = _val;
} }
void emplace_back(_Ty& _val) {
grow();
container[size++] = std::move(_val);
}
void emplace_back(_Ty&& _val) { void emplace_back(_Ty&& _val) {
grow(); grow();
container[size++] = std::move(_val); container[size++] = std::move(_val);

@ -0,0 +1,3 @@
create table f (a float, b vecfloat, c int)
load complex data infile 'data/test_complex.csv' into table f fields terminated by ',' element terminated by ';'
select * from f

@ -18,3 +18,9 @@ SELECT max(endofdayprice/prev(endofdayprice)) as Max_Ratio
FROM ticks FROM ticks
ASSUMING ASC date ASSUMING ASC date
WHERE ID = "3001" WHERE ID = "3001"
CREATE TABLE ticks2(ID VARCHAR(20), max REAL, min REAL)
INSERT INTO ticks2 SELECT ID AS ID, max(ratios(endofdayprice)) AS max, min(ratios(endofdayprice)) AS min from ticks group by ID;
SELECT ID, max, min
FROM ticks2;
Loading…
Cancel
Save