From 6478deb7dad89b6864f72a7aa53fbd85ae32a64b Mon Sep 17 00:00:00 2001 From: Bill Date: Thu, 20 Oct 2022 06:00:57 +0800 Subject: [PATCH 01/30] read complex data from csv --- README.md | 26 ++++++++----- aquery_parser/sql_parser.py | 10 ++++- csv.h | 61 +++++++++++++++++++++++++----- engine/types.py | 17 +++++---- engine/utils.py | 8 +++- prompt.py | 36 +++++++++--------- reconstruct/__init__.py | 1 + reconstruct/ast.py | 74 +++++++++++++++++++++++++++++++++---- reconstruct/expr.py | 6 ++- reconstruct/storage.py | 7 +++- server/vector_type.hpp | 4 ++ tests/complex_data.a | 3 ++ tests/q4.a | 8 +++- 13 files changed, 201 insertions(+), 60 deletions(-) create mode 100644 tests/complex_data.a diff --git a/README.md b/README.md index de90bab..fbc1076 100644 --- a/README.md +++ b/README.md @@ -147,8 +147,7 @@ See files in ./tests/ for more examples. ## Execution Engines - AQuery++ supports different execution engines thanks to the decoupled compiler structure. - Hybrid Execution Engine: decouples the query into two parts. The sql-compliant part is executed by an Embedded version of Monetdb and everything else is executed by a post-process module which is generated by AQuery++ Compiler in C++ and then compiled and executed. -- AQuery Execution Engine: executes queries by compiling the query plan to C++ code. Doesn't support joins and udf functions. -- K9 Execution Engine: (discontinued). +- AQuery Library: A set of header based libraries that provide column arithmetic and operations inspired by array programming languages like kdb. This library is used by C++ post-processor code which can significantly reduce the complexity of generated code, reducing compile time while maintaining the best performance. The set of libraries can also be used by UDFs as well as User modules which makes it easier for users to write simple but powerful extensions. # Roadmap - [x] SQL Parser -> AQuery Parser (Front End) @@ -156,14 +155,21 @@ See files in ./tests/ for more examples. - [x] Schema and Data Model - [x] Data acquisition/output from/to csv file - [ ] Execution Engine - - [x] Projections and single-group Aggregations - - [x] Group by Aggregations - - [x] Filters - - [x] Order by - - [x] Assumption - - [x] Flatten - - [x] Join (Hybrid Engine only) - - [ ] Subqueries + - [x] Single Query + - [x] Projections and single-group Aggregations + - [x] Group by Aggregations + - [x] Filters + - [x] Order by + - [x] Assumption + - [x] Flatten + - [x] Join (Hybrid Engine only) + - [ ] Subquery + - [ ] With Clause + - [ ] From subquery + - [ ] Select sunquery + - [ ] Where subquery + - [ ] Subquery in group by + - [ ] Subquery in order by - [x] Query Optimization - [x] Selection/Order by push-down - [x] Join Optimization (Only in Hybrid Engine) diff --git a/aquery_parser/sql_parser.py b/aquery_parser/sql_parser.py index 9c08db6..5308c2a 100644 --- a/aquery_parser/sql_parser.py +++ b/aquery_parser/sql_parser.py @@ -8,6 +8,7 @@ # from sre_parse import WHITESPACE + from mo_parsing.helpers import restOfLine from mo_parsing.infix import delimited_list from mo_parsing.whitespaces import NO_WHITESPACE, Whitespace @@ -655,7 +656,8 @@ def parser(literal_string, ident, sqlserver=False): ) / to_json_call load_data = ( - keyword("data") ("file_type") + Optional(keyword("complex")("complex")) + + keyword("data") ("file_type") + keyword("infile")("loc") + literal_string ("file") + INTO @@ -667,6 +669,12 @@ def parser(literal_string, ident, sqlserver=False): + keyword("by").suppress() + literal_string ("term") ) + + Optional( + keyword("element").suppress() + + keyword("terminated").suppress() + + keyword("by").suppress() + + literal_string ("ele") + ) ) module_func_def = ( diff --git a/csv.h b/csv.h index c5cb5bc..c0d1762 100644 --- a/csv.h +++ b/csv.h @@ -1,4 +1,4 @@ -// Copyright: (2012-2015) Ben Strasser +// Copyright: (2012-2015) Ben Strasser , 2022 Bill Sun // License: BSD-3 // // All rights reserved. @@ -49,6 +49,7 @@ #include #include #include +#include "server/vector_type.hpp" namespace io{ //////////////////////////////////////////////////////////////////////////// @@ -974,8 +975,7 @@ namespace io{ return; } x = 10*x+y; - }else - throw error::no_digit(); + } ++col; } } @@ -1005,8 +1005,7 @@ namespace io{ return; } x = 10*x-y; - }else - throw error::no_digit(); + } ++col; } return; @@ -1080,19 +1079,37 @@ namespace io{ } x *= base; } - }else{ - if(*col != '\0') - throw error::no_digit(); } if(is_neg) x = -x; } + template void parse(char*col, float&x) { parse_float(col, x); } template void parse(char*col, double&x) { parse_float(col, x); } template void parse(char*col, long double&x) { parse_float(col, x); } - + + + template + void parse_vector(char* col, vector_type& x) { + while (*col != '\0') { + char* next_col = col; + while (*next_col != sep2 && *next_col != '\0') + ++next_col; + while (*next_col == ' ' || *next_col == '\t' || + *next_col == sep2 || *next_col == '\r' || + *next_col == '\n') + ++next_col; + char _next_end = *next_col; + *next_col = '\0'; + T y; + ::io::detail::parse(col, y); + x.emplace_back(y); + col = next_col; + *next_col = _next_end; + } + } template void parse(char*col, T&x){ // Mute unused variable compiler warning @@ -1108,6 +1125,7 @@ namespace io{ } template, class quote_policy = no_quote_escape<','>, class overflow_policy = throw_on_overflow, @@ -1234,7 +1252,23 @@ namespace io{ parse_helper(r+1, cols...); } - + template + void parse_helper(std::size_t r, vector_type&t, ColType&...cols){ + if(row[r]){ + try{ + try{ + ::io::detail::parse_vector(row[r], t); + }catch(error::with_column_content&err){ + err.set_column_content(row[r]); + throw; + } + }catch(error::with_column_name&err){ + err.set_column_name(column_names[r].c_str()); + throw; + } + } + parse_helper(r+1, cols...); + } public: template bool read_row(ColType& ...cols){ @@ -1269,5 +1303,12 @@ namespace io{ } }; } + +template +using AQCSVReader = io::CSVReader, io::no_quote_escape, + io::ignore_overflow, io::empty_line_comment + >; + #endif diff --git a/engine/types.py b/engine/types.py index 8eac736..5a56e12 100644 --- a/engine/types.py +++ b/engine/types.py @@ -1,8 +1,9 @@ from copy import deepcopy -from engine.utils import base62uuid, defval -from aquery_config import have_hge from typing import Dict, List +from aquery_config import have_hge +from engine.utils import base62uuid, defval + type_table: Dict[str, "Types"] = {} class Types: @@ -65,10 +66,10 @@ class Types: return self.sqlname @staticmethod - def decode(aquery_type : str, vector_type:str = 'ColRef') -> "Types": - if (aquery_type.startswith('vec')): + def decode(aquery_type : str, vector_type:str = 'vector_type') -> "Types": + if (aquery_type.lower().startswith('vec')): return VectorT(Types.decode(aquery_type[3:]), vector_type) - return type_table[aquery_type] + return type_table[aquery_type.lower()] class TypeCollection: def __init__(self, sz, deftype, fptype = None, utype = None, *, collection = None) -> None: @@ -121,7 +122,7 @@ class VectorT(Types): return f'{self.vector_type}<{self.inner_type.name}>' @property def sqlname(self) -> str: - return 'BIGINT' + return 'HUGEINT' # Store vector_type into 16 bit integers @property def cname(self) -> str: return f'{self.vector_type}<{self.inner_type.cname}>' @@ -142,7 +143,7 @@ fp_types : Dict[str, Types] = _ty_make_dict('t.sqlname.lower()', FloatT, DoubleT temporal_types : Dict[str, Types] = _ty_make_dict('t.sqlname.lower()', DateT, TimeT, TimeStampT) builtin_types : Dict[str, Types] = { 'string' : StrT, - **_ty_make_dict('t.sqlname.lower()', AnyT, TextT, VarcharT), + **_ty_make_dict('t.sqlname.lower()', AnyT, TextT, VarcharT, HgeT), **int_types, **fp_types, **temporal_types} def get_int128_support(): @@ -365,3 +366,5 @@ user_module_func = {} builtin_operators : Dict[str, OperatorBase] = {**builtin_binary_arith, **builtin_binary_logical, **builtin_unary_arith, **builtin_unary_logical, **builtin_unary_special, **builtin_func, **builtin_cstdlib, **user_module_func} + +type_table = {**builtin_types, **type_table} \ No newline at end of file diff --git a/engine/utils.py b/engine/utils.py index 065f8c8..dc7f2bc 100644 --- a/engine/utils.py +++ b/engine/utils.py @@ -1,6 +1,6 @@ -from collections import OrderedDict -from collections.abc import MutableMapping, Mapping import uuid +from collections import OrderedDict +from collections.abc import Mapping, MutableMapping lower_alp = 'abcdefghijklmnopqrstuvwxyz' upper_alp = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' @@ -107,6 +107,8 @@ def defval(val, default): # escape must be readonly from typing import Mapping, Set + + def remove_last(pattern : str, string : str, escape : Set[str] = set()) -> str: idx = string.rfind(pattern) if idx == -1: @@ -126,9 +128,11 @@ class _Counter: return cnt import re + ws = re.compile(r'\s+') import os + def add_dll_dir(dll: str): import sys if sys.version_info.major >= 3 and sys.version_info.minor >7 and os.name == 'nt': diff --git a/prompt.py b/prompt.py index cd17360..c6a00dd 100644 --- a/prompt.py +++ b/prompt.py @@ -1,4 +1,5 @@ import aquery_config + help_message = '''\ ====================================================== AQUERY COMMANDLINE HELP @@ -82,31 +83,31 @@ if __name__ == '__main__': -import os -from dataclasses import dataclass +import atexit +import ctypes import enum -import time +import mmap +import os # import dbconn import re +import subprocess +import sys +import threading +import time +from dataclasses import dataclass from typing import Callable, List, Optional + +import numpy as np from mo_parsing import ParseException + import aquery_parser as parser import engine -import engine.projection import engine.ddl +import engine.projection import reconstruct as xengine -import subprocess -import mmap -import sys -from engine.utils import base62uuid -import atexit -import threading -import ctypes -import numpy as np -from engine.utils import ws -from engine.utils import add_dll_dir -from engine.utils import nullstream from build import build_manager +from engine.utils import add_dll_dir, base62uuid, nullstream, ws + ## CLASSES BEGIN class RunType(enum.Enum): @@ -407,7 +408,7 @@ def prompt(running = lambda:True, next = lambda:input('> '), state = None): for t in cxt.tables: lst_cols = [] for c in t.columns: - lst_cols.append(f'{c.name} : {c.type}') + lst_cols.append(f'{c.name} : {c.type.name}') print(f'{t.table_name} ({", ".join(lst_cols)})') continue elif q.startswith('help'): @@ -605,7 +606,8 @@ def prompt(running = lambda:True, next = lambda:input('> '), state = None): print("\nBye.") raise except ValueError as e: - import code, traceback + import code + import traceback __stdin = os.dup(0) raise_exception = True sh = code.InteractiveConsole({**globals(), **locals()}) diff --git a/reconstruct/__init__.py b/reconstruct/__init__.py index fd02f61..97afaba 100644 --- a/reconstruct/__init__.py +++ b/reconstruct/__init__.py @@ -1,4 +1,5 @@ from reconstruct.ast import Context, ast_node + saved_cxt = None def initialize(cxt = None, keep = False): diff --git a/reconstruct/ast.py b/reconstruct/ast.py index f81083e..90615ac 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -1,12 +1,13 @@ from copy import deepcopy from dataclasses import dataclass from enum import Enum, auto -from typing import Set, Tuple, Dict, Union, List, Optional +from typing import Dict, List, Optional, Set, Tuple, Union from engine.types import * -from engine.utils import enlist, base62uuid, base62alp, get_legal_name -from reconstruct.storage import Context, TableInfo, ColRef - +from engine.utils import base62alp, base62uuid, enlist, get_legal_name +from reconstruct.storage import ColRef, Context, TableInfo + + class ast_node: header = [] types = dict() @@ -70,7 +71,11 @@ class projection(ast_node): elif 'select_distinct' in node: p = node['select_distinct'] self.distinct = True - + if 'with' in node: + self.with_clause = projection(self, node['value']) + else: + self.with_clause = None + self.projections = p if type(p) is list else [p] if self.parent is None: self.context.sql_begin() @@ -951,6 +956,9 @@ class load(ast_node): if node['load']['file_type'] == 'module': self.produce = self.produce_module self.module = True + elif 'complex' in node['load']: + self.produce = self.produce_cpp + self.consume = lambda *_: None elif self.context.dialect == 'MonetDB': self.produce = self.produce_monetdb else: @@ -1019,7 +1027,56 @@ class load(ast_node): self.sql = f'{s1} \'{p}\' {s2} ' if 'term' in node: self.sql += f' {s3} \'{node["term"]["literal"]}\'' - + + def produce_cpp(self, node): + self.context.has_dll = True + self.context.headers.add('"csv.h"') + node = node['load'] + self.postproc_fname = 'ld_' + base62uuid(5) + self.context.postproc_begin(self.postproc_fname) + + table:TableInfo = self.context.tables_byname[node['table']] + self.sql = F"SELECT {', '.join([c.name for c in table.columns])} FROM {table.table_name};" + self.emit(self.sql+';\n') + self.context.sql_end() + length_name = 'len_' + base62uuid(6) + self.context.emitc(f'auto {length_name} = server->cnt;') + + out_typenames = [t.type.cname for t in table.columns] + outtable_col_nameslist = ', '.join([f'"{c.name}"' for c in table.columns]) + + self.outtable_col_names = 'names_' + base62uuid(4) + self.context.emitc(f'const char* {self.outtable_col_names}[] = {{{outtable_col_nameslist}}};') + + self.out_table = 'tbl_' + base62uuid(4) + self.context.emitc(f'auto {self.out_table} = new TableInfo<{",".join(out_typenames)}>("{table.table_name}", {self.outtable_col_names});') + for i, c in enumerate(table.columns): + c.cxt_name = 'c_' + base62uuid(6) + self.context.emitc(f'decltype(auto) {c.cxt_name} = {self.out_table}->get_col<{i}>();') + self.context.emitc(f'{c.cxt_name}.initfrom({length_name}, server->getCol({i}), "{table.columns[i].name}");') + csv_reader_name = 'csv_reader_' + base62uuid(6) + col_types = [c.type.cname for c in table.columns] + col_tmp_names = ['tmp_'+base62uuid(8) for _ in range(len(table.columns))] + #col_names = ','.join([f'"{c.name}"' for c in table.columns]) + term_field = ',' if 'term' not in node else node['term']['literal'] + term_ele = ';' if 'ele' not in node else node['ele']['literal'] + self.context.emitc(f'AQCSVReader<{len(col_types)}, \'{term_field.strip()[0]}\', \'{term_ele.strip()[0]}\'> {csv_reader_name}("{node["file"]["literal"]}");') + # self.context.emitc(f'{csv_reader_name}.read_header(io::ignore_extra_column, {col_names});') + self.context.emitc(f'{csv_reader_name}.next_line();') + + for t, n in zip(col_types, col_tmp_names): + self.context.emitc(f'{t} {n};') + self.context.emitc(f'while({csv_reader_name}.read_row({",".join(col_tmp_names)})) {{ \n') + for i, c in enumerate(table.columns): + self.context.emitc(f'print({col_tmp_names[i]});') + self.context.emitc(f'{c.cxt_name}.emplace_back({col_tmp_names[i]});') + + self.context.emitc('}') + self.context.emitc(f'print(*{self.out_table});') + self.context.emitc(f'{self.out_table}->monetdb_append_table(cxt->alt_server, "{table.table_name}");') + + self.context.postproc_end(self.postproc_fname) + class outfile(ast_node): name="_outfile" def __init__(self, parent, node, context = None, *, sql = None): @@ -1121,7 +1178,7 @@ class udf(ast_node): def produce(self, node): - from engine.utils import get_legal_name, check_legal_name + from engine.utils import check_legal_name, get_legal_name node = node[self.name] # register udf self.agg = 'Agg' in node @@ -1216,7 +1273,7 @@ class udf(ast_node): def consume(self, node): - from engine.utils import get_legal_name, check_legal_name + from engine.utils import check_legal_name, get_legal_name node = node[self.name] if 'params' in node: @@ -1339,4 +1396,5 @@ def include(objs): import sys + include(sys.modules[__name__]) diff --git a/reconstruct/expr.py b/reconstruct/expr.py index 4fd483b..f1e3d5a 100644 --- a/reconstruct/expr.py +++ b/reconstruct/expr.py @@ -1,7 +1,8 @@ from typing import Optional, Set + +from engine.types import * from reconstruct.ast import ast_node from reconstruct.storage import ColRef, Context -from engine.types import * # TODO: Decouple expr and upgrade architecture # C_CODE : get ccode/sql code? @@ -31,6 +32,7 @@ class expr(ast_node): def __init__(self, parent, node, *, c_code = None, supress_undefined = False): from reconstruct.ast import projection, udf + # gen2 expr have multi-passes # first pass parse json into expr tree # generate target code in later passes upon need @@ -78,7 +80,7 @@ class expr(ast_node): ast_node.__init__(self, parent, node, None) def init(self, _): - from reconstruct.ast import projection, _tmp_join_union + from reconstruct.ast import _tmp_join_union, projection parent = self.parent self.is_compound = parent.is_compound if type(parent) is expr else False if type(parent) in [projection, expr, _tmp_join_union]: diff --git a/reconstruct/storage.py b/reconstruct/storage.py index d54db52..2873747 100644 --- a/reconstruct/storage.py +++ b/reconstruct/storage.py @@ -1,12 +1,14 @@ +from typing import Dict, List, Set + from engine.types import * from engine.utils import CaseInsensitiveDict, base62uuid, enlist -from typing import List, Dict, Set + class ColRef: def __init__(self, _ty, cobj, table:'TableInfo', name, id, compound = False, _ty_args = None): self.type : Types = AnyT if type(_ty) is str: - self.type = builtin_types[_ty.lower()] + self.type = Types.decode(_ty) if _ty_args: self.type = self.type(enlist(_ty_args)) elif type(_ty) is Types: @@ -17,6 +19,7 @@ class ColRef: self.alias = set() self.id = id # position in table self.compound = compound # compound field (list as a field) + self.cxt_name = '' # e.g. order by, group by, filter by expressions self.__arr__ = (_ty, cobj, table, name, id) diff --git a/server/vector_type.hpp b/server/vector_type.hpp index 9b03e89..f0d4cc6 100644 --- a/server/vector_type.hpp +++ b/server/vector_type.hpp @@ -159,6 +159,10 @@ public: grow(); container[size++] = _val; } + void emplace_back(_Ty& _val) { + grow(); + container[size++] = std::move(_val); + } void emplace_back(_Ty&& _val) { grow(); container[size++] = std::move(_val); diff --git a/tests/complex_data.a b/tests/complex_data.a new file mode 100644 index 0000000..e08da4b --- /dev/null +++ b/tests/complex_data.a @@ -0,0 +1,3 @@ +create table f (a float, b vecfloat, c int) +load complex data infile 'data/test_complex.csv' into table f fields terminated by ',' element terminated by ';' +select * from f \ No newline at end of file diff --git a/tests/q4.a b/tests/q4.a index 4237b16..d38a246 100644 --- a/tests/q4.a +++ b/tests/q4.a @@ -17,4 +17,10 @@ LOAD DATA INFILE "data/ticks.csv" INTO TABLE TICKS FIELDS TERMINATED BY "," SELECT max(endofdayprice/prev(endofdayprice)) as Max_Ratio FROM ticks ASSUMING ASC date -WHERE ID = "3001" \ No newline at end of file +WHERE ID = "3001" + +CREATE TABLE ticks2(ID VARCHAR(20), max REAL, min REAL) +INSERT INTO ticks2 SELECT ID AS ID, max(ratios(endofdayprice)) AS max, min(ratios(endofdayprice)) AS min from ticks group by ID; + +SELECT ID, max, min +FROM ticks2; \ No newline at end of file From d5382c36e93be08bb0df504b199b088fbf40a01c Mon Sep 17 00:00:00 2001 From: Bill Date: Fri, 21 Oct 2022 14:52:01 +0800 Subject: [PATCH 02/30] bug fixes --- .gitignore | 1 + Dockerfile | 2 +- data/test_complex.csv | 6 ++++++ reconstruct/ast.py | 2 +- server/table_ext_monetdb.hpp | 12 ++++++------ server/vector_type.hpp | 3 +++ tests/dt2.a | 26 ++++++++++++++++++++++++++ tests/q1.sql | 4 +++- 8 files changed, 47 insertions(+), 9 deletions(-) create mode 100644 data/test_complex.csv create mode 100644 tests/dt2.a diff --git a/.gitignore b/.gitignore index 4807b2c..644be8b 100644 --- a/.gitignore +++ b/.gitignore @@ -57,6 +57,7 @@ test*.c* !moving_avg.csv !nyctx100.csv !network.csv +!test_complex.csv *.out *.asm !mmw.so diff --git a/Dockerfile b/Dockerfile index aac0a4f..953d89f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM ubuntu:latest -RUN cp /bin/bash /bin/sh +# RUN cp /bin/bash /bin/sh RUN apt update && apt install -y wget diff --git a/data/test_complex.csv b/data/test_complex.csv new file mode 100644 index 0000000..efd7b3e --- /dev/null +++ b/data/test_complex.csv @@ -0,0 +1,6 @@ +a,b,c +5e-3, 3;4 ;5e-3;6.32,7 +1,2,3 +4,5;6;7;8;9, 0 + 3 ,2 ; 4; 5.7; -.3; 5., 6 +-3.12312,-4E+7;67456746744567;75,4 diff --git a/reconstruct/ast.py b/reconstruct/ast.py index 90615ac..d82ebce 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -1068,7 +1068,7 @@ class load(ast_node): self.context.emitc(f'{t} {n};') self.context.emitc(f'while({csv_reader_name}.read_row({",".join(col_tmp_names)})) {{ \n') for i, c in enumerate(table.columns): - self.context.emitc(f'print({col_tmp_names[i]});') + # self.context.emitc(f'print({col_tmp_names[i]});') self.context.emitc(f'{c.cxt_name}.emplace_back({col_tmp_names[i]});') self.context.emitc('}') diff --git a/server/table_ext_monetdb.hpp b/server/table_ext_monetdb.hpp index c128559..3c93c3f 100644 --- a/server/table_ext_monetdb.hpp +++ b/server/table_ext_monetdb.hpp @@ -45,16 +45,16 @@ void TableInfo::monetdb_append_table(void* srv, const char* alt_name) { puts("getcols..."); uint32_t cnt = 0; const auto get_col = [&monetdbe_cols, &i, *this, &gc_vecs, &cnt](auto v) { - printf("%d %d\n", i, (ColRef*)v - colrefs); + // printf("%d %d\n", i, (ColRef*)v - colrefs); monetdbe_cols[i++] = (monetdbe_column*)v->monetdb_get_col(gc_vecs, cnt); }; (get_col((ColRef*)(colrefs + i)), ...); puts("getcols done"); - for(int i = 0; i < sizeof...(Ts); ++i) - { - printf("no:%d name: %s count:%d data: %p type:%d \n", - i, monetdbe_cols[i]->name, monetdbe_cols[i]->count, monetdbe_cols[i]->data, monetdbe_cols[i]->type); - } + // for(int i = 0; i < sizeof...(Ts); ++i) + // { + // printf("no:%d name: %s count:%d data: %p type:%d \n", + // i, monetdbe_cols[i]->name, monetdbe_cols[i]->count, monetdbe_cols[i]->data, monetdbe_cols[i]->type); + // } std::string create_table_str = "CREATE TABLE IF NOT EXISTS "; create_table_str += alt_name; create_table_str += " ("; diff --git a/server/vector_type.hpp b/server/vector_type.hpp index f0d4cc6..620e14d 100644 --- a/server/vector_type.hpp +++ b/server/vector_type.hpp @@ -71,6 +71,9 @@ public: constexpr explicit vector_type(const vector_type<_Ty>& vt) noexcept : capacity(0) { _copy(vt); } + constexpr vector_type(vector_type<_Ty>& vt) noexcept : capacity(0) { + _move(std::move(vt)); + } constexpr vector_type(vector_type<_Ty>&& vt) noexcept : capacity(0) { _move(std::move(vt)); } diff --git a/tests/dt2.a b/tests/dt2.a new file mode 100644 index 0000000..0f9dc7f --- /dev/null +++ b/tests/dt2.a @@ -0,0 +1,26 @@ +LOAD MODULE FROM "./libirf.so" +FUNCTIONS ( + newtree(height:int, f:int64, sparse:vecint64, forget:double, maxf:int64, noclasses:int64, e:int, r:int64, rb:int64) -> bool, + fit(X:vecvecdouble, y:vecint64) -> bool, + predict(X:vecvecdouble) -> vecint64 +); + +create table source(x1 double, x2 double, x3 double, x4 double, x5 int64); +load data infile "data/benchmark" into table source fields terminated by ","; + +create table sparse(x int64); +insert into sparse values (1); +insert into sparse values (1); +insert into sparse values (1); +insert into sparse values (1); + +select * from source; + +select newtree(6, 4, sparse.x, 0, 4, 2, 0, 400, 2147483647) from sparse; + +select fit(pack(x1, x2, x3, x4), x5) from source limit 100; +select fit(pack(x1, x2, x3, x4), x5) from source limit 100; +select fit(pack(x1, x2, x3, x4), x5) from source limit 100; +select fit(pack(x1, x2, x3, x4), x5) from source limit 100; + +select predict(pack(x1, x2, x3, x4)) from source limit 100; \ No newline at end of file diff --git a/tests/q1.sql b/tests/q1.sql index 747b83b..eab8904 100644 --- a/tests/q1.sql +++ b/tests/q1.sql @@ -7,4 +7,6 @@ FIELDS TERMINATED BY "," SELECT sum(c), b, d FROM testq1 group by a,b,d -order by d DESC, b ASC +order by d DESC, b ASC; + +-- aaaa \ No newline at end of file From 259d9ef5665c5030021bdd7020691c8e027d7d93 Mon Sep 17 00:00:00 2001 From: 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 Date: Fri, 21 Oct 2022 02:59:00 -0400 Subject: [PATCH 03/30] bug --- sdk/Evaluation.cpp | 3 +-- sdk/irf.cpp | 42 ++++++++++++++++-------------------------- tests/dt.a | 34 +++++++++++++++++----------------- 3 files changed, 34 insertions(+), 45 deletions(-) diff --git a/sdk/Evaluation.cpp b/sdk/Evaluation.cpp index 3683597..8e347a7 100644 --- a/sdk/Evaluation.cpp +++ b/sdk/Evaluation.cpp @@ -5,14 +5,13 @@ struct minEval{ double value; - double values; + int* values; double eval; long left; // how many on its left double* record; long max; long** count; - long* sorted; // sorted d }; minEval giniSparse(double** data, long* result, long* d, long size, long col, long classes, long* totalT){ diff --git a/sdk/irf.cpp b/sdk/irf.cpp index 8433c95..73eef77 100644 --- a/sdk/irf.cpp +++ b/sdk/irf.cpp @@ -4,9 +4,6 @@ #include "../server/table.h" DecisionTree* dt = nullptr; -long pt = 0; -double** data = nullptr; -long* result = nullptr; __AQEXPORT__(bool) newtree(int height, long f, ColRef sparse, double forget, long maxf, long noclasses, Evaluation e, long r, long rb){ if(sparse.size!=f)return 0; @@ -19,36 +16,29 @@ __AQEXPORT__(bool) newtree(int height, long f, ColRef sparse, double forget return 1; } -__AQEXPORT__(bool) additem(ColRefX, long y, long size){ - long j = 0; - if(size>0){ - free(data); - free(result); - pt = 0; - data=(double**)malloc(size*sizeof(double*)); - result=(long*)malloc(size*sizeof(long)); +__AQEXPORT__(bool) fit(ColRef> X, ColRef y){ + if(X.size != y.size)return 0; + double** data = (double**)malloc(X.size*sizeof(double*)); + long* result = (long*)malloc(y.size*sizeof(long)); + for(long i=0; ifit(data, result, pt); + dt->fit(data, result, X.size); return 1; } -__AQEXPORT__(ColRef_storage) predict(){ - int* result = (int*)malloc(pt*sizeof(int)); - for(long i=0; i> X){ + double** data = (double**)malloc(X.size*sizeof(double*)); + int* result = (int*)malloc(X.size*sizeof(int)); + for(long i=0; iTest(data[i], dt->DTree); } - return ColRef_storage(new ColRef_storage(result, pt, 0, "prediction", 0), 1, 0, "prediction", 0); + return ColRef_storage(new ColRef_storage(result, X.size, 0, "prediction", 0), 1, 0, "prediction", 0); } diff --git a/tests/dt.a b/tests/dt.a index 5a52ac1..abfc6a6 100644 --- a/tests/dt.a +++ b/tests/dt.a @@ -1,21 +1,21 @@ LOAD MODULE FROM "./libirf.so" FUNCTIONS ( newtree(height:int, f:int64, sparse:vecint, forget:double, maxf:int64, noclasses:int64, e:int, r:int64, rb:int64) -> bool, - additem(X:vecdouble, y:int64, size:int64) -> bool, - fit() -> bool, - predict() -> vecint + fit(X:vecvecdouble, y:vecint) -> bool, + predict(X:vecvecdouble) -> vecint ); -create table tb(x int); -create table tb2(x double, y double, z double); -insert into tb values (0); -insert into tb values (0); -insert into tb values (0); -select newtree(5, 3, tb.x, 0, 3, 2, 0, 100, 1) from tb; -insert into tb2 values (1, 0, 1); -insert into tb2 values (0, 1, 1); -insert into tb2 values (1, 1, 1); -select additem(tb2.x, 1, 3) from tb2; -select additem(tb2.y, 0, -1) from tb2; -select additem(tb2.z, 1, -1) from tb2; -select fit(); -select predict(); + +create table source(x1 double, x2 double, x3 double, x4 double, x5 int); +load data infile "data/benchmark" into table source fields terminated by ","; + +create table sparse(x int); +insert into sparse values (1); +insert into sparse values (1); +insert into sparse values (1); +insert into sparse values (1); + +select newtree(6, 4, sparse.x, 0, 4, 2, 0, 400, 2147483647) from sparse; + +select fit(pack(x1, x2, x3, x4), x5) from source; + +select predict(pack(x1, x2, x3, x4)) from source; From 5549706443f70ab6dd57421006c09b4fd03d16f5 Mon Sep 17 00:00:00 2001 From: Bill Date: Sun, 23 Oct 2022 05:47:53 +0800 Subject: [PATCH 04/30] fixed issue for user module --- .gitignore | 1 + Makefile | 5 +++-- reconstruct/ast.py | 6 +++--- sdk/Makefile | 8 +++++++- sdk/irf.cpp | 33 +++++++++++++++++++++++---------- server/table.h | 17 ++++++++++++++++- server/vector_type.hpp | 3 +++ tests/dt.a | 40 ++++++++++++++++++++-------------------- tests/dt2.a | 36 ++++++++++++++++-------------------- tests/q1.sql | 2 -- 10 files changed, 92 insertions(+), 59 deletions(-) diff --git a/.gitignore b/.gitignore index 644be8b..508685f 100644 --- a/.gitignore +++ b/.gitignore @@ -51,6 +51,7 @@ k **/Debug **/Release test*.c* +data/benchmark *.csv !test.csv !test2.csv diff --git a/Makefile b/Makefile index dd7747e..1707240 100644 --- a/Makefile +++ b/Makefile @@ -4,11 +4,12 @@ MonetDB_INC = Threading = CXXFLAGS = --std=c++1z ifeq ($(AQ_DEBUG), 1) - OPTFLAGS = -g3 + OPTFLAGS = -g3 -fsanitize=address -fsanitize=leak + LINKFLAGS = else OPTFLAGS = -O3 -DNDEBUG -fno-stack-protector + LINKFLAGS = -flto endif -LINKFLAGS = -flto # + $(AQ_LINK_FLAG) SHAREDFLAGS = -shared FPIC = -fPIC COMPILER = $(shell $(CXX) --version | grep -q clang && echo clang|| echo gcc) diff --git a/reconstruct/ast.py b/reconstruct/ast.py index d82ebce..173399b 100644 --- a/reconstruct/ast.py +++ b/reconstruct/ast.py @@ -343,7 +343,7 @@ class projection(ast_node): ) else: # for funcs evaluate f_i(x, ...) - self.context.emitc(f'{self.out_table.contextname_cpp}->get_col<{key}>() = {val[1]};') + self.context.emitc(f'{self.out_table.contextname_cpp}->get_col<{key}>().initfrom({val[1]}, "{cols[i].name}");') # print out col_is if 'into' not in node: self.context.emitc(f'print(*{self.out_table.contextname_cpp});') @@ -990,7 +990,7 @@ class load(ast_node): self.context.queries.append(f'F{fname}') ret_type = VoidT if 'ret_type' in f: - ret_type = Types.decode(f['ret_type']) + ret_type = Types.decode(f['ret_type'], vector_type='vector_type') nargs = 0 arglist = '' if 'vars' in f: @@ -1000,7 +1000,7 @@ class load(ast_node): nargs = len(arglist) arglist = ', '.join(arglist) # create c++ stub - cpp_stub = f'{ret_type.cname} (*{fname})({arglist}) = nullptr;' + cpp_stub = f'{"vectortype_cstorage" if isinstance(ret_type, VectorT) else ret_type.cname} (*{fname})({arglist}) = nullptr;' self.context.module_stubs += cpp_stub + '\n' self.context.module_map[fname] = cpp_stub #registration for parser diff --git a/sdk/Makefile b/sdk/Makefile index 7bd5c8c..b146a81 100644 --- a/sdk/Makefile +++ b/sdk/Makefile @@ -1,5 +1,11 @@ +OPT_FLASG = +ifneq ($(DEBUG), 1) + OPT_FLAGS = -Ofast -march=native -flto -DNDEBUG +else + OPT_FLAGS = -g3 -D_DEBUG -fsanitize=leak -fsanitize=address +endif example: $(CXX) -shared -fPIC example.cpp aquery_mem.cpp -fno-semantic-interposition -Ofast -march=native -flto --std=c++1z -o ../test.so irf: - $(CXX) -shared -fPIC RF.cpp irf.cpp incrementalDecisionTree.cpp aquery_mem.cpp Evaluation.cpp -fno-semantic-interposition -Ofast -march=native -flto --std=c++1z -o ../libirf.so + $(CXX) -shared -fPIC RF.cpp irf.cpp incrementalDecisionTree.cpp aquery_mem.cpp Evaluation.cpp -fno-semantic-interposition $(OPT_FLAGS) --std=c++1z -o ../libirf.so all: example diff --git a/sdk/irf.cpp b/sdk/irf.cpp index 8433c95..36cf4c2 100644 --- a/sdk/irf.cpp +++ b/sdk/irf.cpp @@ -36,19 +36,32 @@ __AQEXPORT__(bool) additem(ColRefX, long y, long size){ pt ++; return 1; } -__AQEXPORT__(bool) fit(){ - if(pt<=0)return 0; - dt->fit(data, result, pt); - return 1; +__AQEXPORT__(bool) fit(vector_type> v, vector_type res){ + double** data = (double**)malloc(v.size*sizeof(double*)); + for(int i = 0; i < v.size; ++i) + data[i] = v.container[i].container; + dt->fit(data, res.container, v.size); + return true; } -__AQEXPORT__(ColRef_storage) predict(){ - int* result = (int*)malloc(pt*sizeof(int)); - for(long i=0; iTest(data[i], dt->DTree); - } +__AQEXPORT__(vectortype_cstorage) predict(vector_type> v){ + int* result = (int*)malloc(v.size*sizeof(int)); - return ColRef_storage(new ColRef_storage(result, pt, 0, "prediction", 0), 1, 0, "prediction", 0); + for(long i=0; iTest(v.container[i].container, dt->DTree); + //printf("%d ", result[i]); + } + auto container = (vector_type*)malloc(sizeof(vector_type)); + container->size = v.size; + container->capacity = 0; + container->container = result; + // container->out(10); + // ColRef>* col = (ColRef>*)malloc(sizeof(ColRef>)); + auto ret = vectortype_cstorage{.container = container, .size = 1, .capacity = 0}; + // col->initfrom(ret, "sibal"); + // print(*col); + return ret; + //return true; } diff --git a/server/table.h b/server/table.h index 56c7a4b..f3911af 100644 --- a/server/table.h +++ b/server/table.h @@ -74,7 +74,7 @@ public: this->container = (_Ty*)container; this->name = name; } - template