dev
bill sun 3 years ago
parent ae51d8c622
commit d1a6b1d83f

@ -110,7 +110,10 @@ class TableInfo:
self.cxt.ccols_byname[cname] = col_object self.cxt.ccols_byname[cname] = col_object
self.columns_byname[c['name']] = col_object self.columns_byname[c['name']] = col_object
self.columns.append(col_object) self.columns.append(col_object)
def get_size(self):
size_tmp = 'tmp_sz_'+base62uuid(6)
self.cxt.emit(f'const auto& {size_tmp} = {self.columns[0].reference()}.size;')
return size_tmp
@property @property
def n_cols(self): def n_cols(self):
return len(self.columns) return len(self.columns)

@ -1,6 +1,7 @@
# code-gen for data decl languages # code-gen for data decl languages
from engine.ast import ColRef, TableInfo, ast_node, Context, include from engine.ast import ColRef, TableInfo, ast_node, Context, include
from engine.scan import scan
from engine.utils import base62uuid from engine.utils import base62uuid
class create_table(ast_node): class create_table(ast_node):
@ -26,9 +27,15 @@ class create_table(ast_node):
for c in tbl.columns: for c in tbl.columns:
self.emit(f"{c.cxt_name}.init();") self.emit(f"{c.cxt_name}.init();")
else: else:
for i, c in enumerate(tbl.columns): if len(self.context.scans) == 0:
self.emit(f"{c.cxt_name}.init();") for i, c in enumerate(tbl.columns):
self.emit(f"{c.cxt_name} = {self.cexpr[i]()};") self.emit(f"{c.cxt_name}.init();")
self.emit(f"{c.cxt_name} = {self.cexpr[i]()};")
else:
scanner:scan = self.context.scans[-1]
for i, c in enumerate(tbl.columns):
scanner.add(f"{c.cxt_name}.init();", "init")
scanner.add(f"{c.cxt_name} = {self.cexpr[i](scanner.it_ver)};")
class insert(ast_node): class insert(ast_node):
name = 'insert' name = 'insert'

@ -23,8 +23,8 @@ class expr(ast_node):
'mul':'*', 'mul':'*',
'div':'/', 'div':'/',
'mod':'%', 'mod':'%',
'and':'&', 'and':'&&',
'or':'|', 'or':'||',
'xor' : '^', 'xor' : '^',
'gt':'>', 'gt':'>',
'lt':'<', 'lt':'<',
@ -92,7 +92,7 @@ class expr(ast_node):
x.append(expr(self, v)._expr) x.append(expr(self, v)._expr)
self._expr = self.compound_ops[key][1](x) self._expr = self.compound_ops[key][1](x)
elif key in self.unary_ops: elif key in self.unary_ops:
self._expr += f'({expr(self, val)._expr}{self.unary_ops[key]})' self._expr += f'{self.unary_ops[key]}({expr(self, val)._expr})'
else: else:
print(f'Undefined expr: {key}{val}') print(f'Undefined expr: {key}{val}')

@ -40,7 +40,7 @@ class groupby(ast_node):
self.emit(f'unordered_map<{self.group_type}, vector_type<uint32_t>, ' self.emit(f'unordered_map<{self.group_type}, vector_type<uint32_t>, '
f'transTypes<{self.group_type}, hasher>> {self.group};') f'transTypes<{self.group_type}, hasher>> {self.group};')
self.n_grps = len(node) self.n_grps = len(node)
self.scanner = scan(self, None, expr.toCExpr(first_col)()+'.size') self.scanner = scan(self, self.datasource, expr.toCExpr(first_col)()+'.size')
self.scanner.add(f'{self.group}[forward_as_tuple({g_contents(self.scanner.it_ver)})].emplace_back({self.scanner.it_ver});') self.scanner.add(f'{self.group}[forward_as_tuple({g_contents(self.scanner.it_ver)})].emplace_back({self.scanner.it_ver});')

@ -14,6 +14,7 @@ class projection(ast_node):
self.disp = disp self.disp = disp
self.outname = outname self.outname = outname
self.group_node = None self.group_node = None
self.where = None
ast_node.__init__(self, parent, node, context) ast_node.__init__(self, parent, node, context)
def init(self, _): def init(self, _):
if self.outname is None: if self.outname is None:
@ -58,8 +59,9 @@ class projection(ast_node):
self.prev_datasource = self.context.datasource self.prev_datasource = self.context.datasource
self.context.datasource = self.datasource self.context.datasource = self.datasource
if 'where' in node: if 'where' in node:
self.datasource = filter(self, node['where'], True).output self.where = filter(self, node['where'], True)
self.context.datasource = self.datasource # self.datasource = filter(self, node['where'], True).output
#self.context.datasource = self.datasource
if 'groupby' in node: if 'groupby' in node:
self.group_node = groupby(self, node['groupby']) self.group_node = groupby(self, node['groupby'])
@ -108,6 +110,9 @@ class projection(ast_node):
create_table(self, self.out_table, cexpr = cexprs) create_table(self, self.out_table, cexpr = cexprs)
self.datasource.group_node = None self.datasource.group_node = None
if self.where is not None:
self.where.finalize()
has_orderby = 'orderby' in node has_orderby = 'orderby' in node
if has_orderby: if has_orderby:

@ -11,10 +11,12 @@ class scan(ast_node):
super().__init__(parent, node, context) super().__init__(parent, node, context)
def init(self, _): def init(self, _):
self.datasource = self.context.datasource self.datasource = self.context.datasource
self.initializers = ''
self.start = '' self.start = ''
self.body = '' self.body = ''
self.end = '}' self.end = '}'
self.filter = None self.mode = None
self.filters = []
scan_vars = set(s.it_var for s in self.context.scans) scan_vars = set(s.it_var for s in self.context.scans)
self.it_ver = 'i' + base62uuid(2) self.it_ver = 'i' + base62uuid(2)
while(self.it_ver in scan_vars): while(self.it_ver in scan_vars):
@ -22,19 +24,30 @@ class scan(ast_node):
self.parent.context.scans.append(self) self.parent.context.scans.append(self)
def produce(self, node): def produce(self, node):
if type(node) is ColRef: if type(node) is ColRef:
self.colref = node
if self.size is None: if self.size is None:
self.mode = ["col", node.table]
self.start += f'for (auto& {self.it_ver} : {node.reference()}) {{\n' self.start += f'for (auto& {self.it_ver} : {node.reference()}) {{\n'
else: else:
self.mode = ["idx", node.table]
self.start += f"for (uint32_t {self.it_ver} = 0; {self.it_ver} < {node.reference()}.size; ++{self.it_ver}){{\\n" self.start += f"for (uint32_t {self.it_ver} = 0; {self.it_ver} < {node.reference()}.size; ++{self.it_ver}){{\\n"
elif type(node) is str: elif type(node) is str:
self.mode = ["idx", None]
self.start+= f'for(auto& {self.it_ver} : {node}) {{\n' self.start+= f'for(auto& {self.it_ver} : {node}) {{\n'
else: else:
self.mode = ["idx", node] # Node is the TableInfo
self.start += f"for (uint32_t {self.it_ver} = 0; {self.it_ver} < {self.size}; ++{self.it_ver}){{\n" self.start += f"for (uint32_t {self.it_ver} = 0; {self.it_ver} < {self.size}; ++{self.it_ver}){{\n"
def add(self, stmt): def add(self, stmt, position = "body"):
self.body+=stmt + '\n' if position == "body":
self.body += stmt + '\n'
else:
self.initializers += stmt + '\n'
def finalize(self): def finalize(self):
self.context.remove_scan(self, self.start + self.body + self.end) for f in self.filters:
self.start += f
self.end += '}'
self.context.remove_scan(self, self.initializers + self.start + self.body + self.end)
class filter(ast_node): class filter(ast_node):
name = 'filter' name = 'filter'
@ -65,8 +78,17 @@ class filter(ast_node):
for o, c in zip(self.output.columns, self.datasource.columns): for o, c in zip(self.output.columns, self.datasource.columns):
self.emit(f'{o.cname}:$[{tmpVar};{c.cname};()]') self.emit(f'{o.cname}:$[{tmpVar};{c.cname};()]')
def consume(self, node): def finalize(self):
self.scanner.finalize()
def consume(self, _):
# TODO: optimizations after converting expr to cnf # TODO: optimizations after converting expr to cnf
self.scanner = None
for s in self.context.scans:
if self.datasource == s.mode[1]:
self.scanner = s
break
if self.scanner is None:
self.scanner = scan(self, self.datasource, self.datasource.get_size())
self.expr = expr(self, self.modified_node) self.expr = expr(self, self.modified_node)
print(node) self.scanner.filters.append(f'if ({self.expr.cexpr(self.scanner.it_ver)}) {{\n')

@ -1,60 +1,56 @@
#include "./server/aggregations.h" #include "csv.h"
#include <unordered_map>
#include "./server/libaquery.h" #include "./server/libaquery.h"
#include "./server/hasher.h"
#include "./server/aggregations.h"
extern "C" int __DLLEXPORT__ dllmain(Context* cxt) { extern "C" int __DLLEXPORT__ dllmain(Context* cxt) {
using namespace std; using namespace std;
using namespace types; using namespace types;
auto stocks = new TableInfo<int,int>("stocks", 2); auto test = new TableInfo<int,int,int,int>("test", 4);
cxt->tables.insert({"stocks", stocks}); cxt->tables.insert({"test", test});
auto& stocks_timestamp = *(ColRef<int> *)(&stocks->colrefs[0]); auto& test_a = *(ColRef<int> *)(&test->colrefs[0]);
auto& stocks_price = *(ColRef<int> *)(&stocks->colrefs[1]); auto& test_b = *(ColRef<int> *)(&test->colrefs[1]);
stocks_timestamp.init(); auto& test_c = *(ColRef<int> *)(&test->colrefs[2]);
stocks_price.init(); auto& test_d = *(ColRef<int> *)(&test->colrefs[3]);
stocks_timestamp.emplace_back(1); test_a.init();
stocks_price.emplace_back(15); test_b.init();
stocks_timestamp.emplace_back(2); test_c.init();
stocks_price.emplace_back(19); test_d.init();
stocks_timestamp.emplace_back(3); io::CSVReader<4> csv_reader_6qlGpe("test.csv");
stocks_price.emplace_back(16); csv_reader_6qlGpe.read_header(io::ignore_extra_column, "a","b","c","d");
stocks_timestamp.emplace_back(4); int tmp_39gHMkie;
stocks_price.emplace_back(17); int tmp_190h2sZs;
stocks_timestamp.emplace_back(5); int tmp_4a8dDzSN;
stocks_price.emplace_back(15); int tmp_3LAKxSmM;
stocks_timestamp.emplace_back(6); while(csv_reader_6qlGpe.read_row(tmp_39gHMkie,tmp_190h2sZs,tmp_4a8dDzSN,tmp_3LAKxSmM)) {
stocks_price.emplace_back(13);
stocks_timestamp.emplace_back(7);
stocks_price.emplace_back(5);
stocks_timestamp.emplace_back(8);
stocks_price.emplace_back(8);
stocks_timestamp.emplace_back(9);
stocks_price.emplace_back(7);
stocks_timestamp.emplace_back(10);
stocks_price.emplace_back(13);
stocks_timestamp.emplace_back(11);
stocks_price.emplace_back(11);
stocks_timestamp.emplace_back(12);
stocks_price.emplace_back(14);
stocks_timestamp.emplace_back(13);
stocks_price.emplace_back(10);
stocks_timestamp.emplace_back(14);
stocks_price.emplace_back(5);
stocks_timestamp.emplace_back(15);
stocks_price.emplace_back(2);
stocks_timestamp.emplace_back(16);
stocks_price.emplace_back(5);
auto out_ZPYh = new TableInfo<decays<decltype(max((stocks_price[0]-min(stocks_timestamp[0]))))>>("out_ZPYh", 1);
cxt->tables.insert({"out_ZPYh", out_ZPYh});
auto& out_ZPYh_maxstockspriceminstockstimestamp = *(ColRef<decays<decltype(max((stocks_price[0]-min(stocks_timestamp[0]))))>> *)(&out_ZPYh->colrefs[0]);
out_ZPYh_maxstockspriceminstockstimestamp.init();
out_ZPYh_maxstockspriceminstockstimestamp = max((stocks_price-min(stocks_timestamp)));
print(*out_ZPYh);
auto out_1ac3 = new TableInfo<decays<decltype(max((stocks_price[0]-mins(stocks_price[0]))))>>("out_1ac3", 1); test_a.emplace_back(tmp_39gHMkie);
cxt->tables.insert({"out_1ac3", out_1ac3}); test_b.emplace_back(tmp_190h2sZs);
auto& out_1ac3_maxstockspriceminsstocksprice = *(ColRef<decays<decltype(max((stocks_price[0]-mins(stocks_price[0]))))>> *)(&out_1ac3->colrefs[0]); test_c.emplace_back(tmp_4a8dDzSN);
out_1ac3_maxstockspriceminsstocksprice.init(); test_d.emplace_back(tmp_3LAKxSmM);
out_1ac3_maxstockspriceminsstocksprice = max((stocks_price-mins(stocks_price))); }
print(*out_1ac3); typedef record<decltype(test_a[0]),decltype(test_b[0]),decltype(test_d[0])> record_type2Te4GFo;
unordered_map<record_type2Te4GFo, vector_type<uint32_t>, transTypes<record_type2Te4GFo, hasher>> g79JNXM8;
for (uint32_t i5x = 0; i5x < test_a.size; ++i5x){
g79JNXM8[forward_as_tuple(test_a[i5x],test_b[i5x],test_d[i5x])].emplace_back(i5x);
}
auto out_5NL7 = new TableInfo<decays<decltype(sum(test_c[0]))>,decays<decltype(test_b[0])>,decays<decltype(test_d[0])>>("out_5NL7", 3);
cxt->tables.insert({"out_5NL7", out_5NL7});
auto& out_5NL7_sumtestc = *(ColRef<decays<decltype(sum(test_c[0]))>> *)(&out_5NL7->colrefs[0]);
auto& out_5NL7_get1None = *(ColRef<decays<decltype(test_b[0])>> *)(&out_5NL7->colrefs[1]);
auto& out_5NL7_get2None = *(ColRef<decays<decltype(test_d[0])>> *)(&out_5NL7->colrefs[2]);
out_5NL7_sumtestc.init();
out_5NL7_get1None.init();
out_5NL7_get2None.init();
for(auto& i4l : g79JNXM8) {
auto &key_ADPihOU = i4l.first;
auto &val_7LsrkDP = i4l.second;
out_5NL7_sumtestc.emplace_back(sum(test_c[val_7LsrkDP]));
out_5NL7_get1None.emplace_back(get<1>(key_ADPihOU));
out_5NL7_get2None.emplace_back(get<2>(key_ADPihOU));
}
print(*out_5NL7);
return 0; return 0;
} }

@ -1,6 +1,8 @@
from concurrent.futures import thread from concurrent.futures import thread
import re import re
import time import time
from mo_parsing import ParseException
import aquery_parser as parser import aquery_parser as parser
import engine import engine
import subprocess import subprocess
@ -137,7 +139,7 @@ while test_parser:
continue continue
stmts = parser.parse(q) stmts = parser.parse(q)
print(stmts) print(stmts)
except (ValueError, FileNotFoundError) as e: except (ValueError, FileNotFoundError, ParseException) as e:
rm() rm()
print(type(e), e) print(type(e), e)

@ -8,7 +8,7 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "server", "server.vcxproj",
{8081FDAA-4D13-4B7A-ADB2-8224AF7F1C81} = {8081FDAA-4D13-4B7A-ADB2-8224AF7F1C81} {8081FDAA-4D13-4B7A-ADB2-8224AF7F1C81} = {8081FDAA-4D13-4B7A-ADB2-8224AF7F1C81}
EndProjectSection EndProjectSection
EndProject EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "Project1", "..\Project1\Project1.vcxproj", "{8081FDAA-4D13-4B7A-ADB2-8224AF7F1C81}" Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "msc-plugin", "..\msc-plugin\msc-plugin.vcxproj", "{8081FDAA-4D13-4B7A-ADB2-8224AF7F1C81}"
EndProject EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution

@ -103,6 +103,39 @@ struct TableInfo {
template <size_t ...Idxs> template <size_t ...Idxs>
using getRecordType = typename GetTypes<Idxs...>::type; using getRecordType = typename GetTypes<Idxs...>::type;
TableInfo(const char* name, uint32_t n_cols); TableInfo(const char* name, uint32_t n_cols);
//template<size_t ...Idxs = Types...>
//struct Iterator_t {
// uint32_t val;
// const TableInfo* info;
// constexpr Iterator_t(const uint32_t* val, const TableInfo* info) noexcept : val(val), info(info) {}
// getRecordType<Idxs...> operator*() {
// return getRecordType<Idxs...>(info->colrefs[Idxs].operator[](*val)...);
// }
// bool operator != (const Iterator_t& rhs) { return rhs.val != val; }
// Iterator_t& operator++ () {
// ++val;
// return *this;
// }
// Iterator_t operator++ (int) {
// Iterator_t tmp = *this;
// ++val;
// return tmp;
// }
//};
//template<size_t ...Idxs = Types...>
//Iterator_t<Idxs...> begin() const {
//
//}
//template<size_t ...Idxs = Types...>
//Iterator_t<Idxs...> end() const {
//
//}
//
//template<int ...Cols, bool ...ord>
//order_by() {
// vector_type<uint32_t> order(colrefs[0].size);
// std::sort(this->begin<Cols>)
//}
}; };
template <size_t _Index, class... _Types> template <size_t _Index, class... _Types>

@ -22,9 +22,9 @@ SELECT max(price-min(timestamp)) FROM stocks
/*<k> "q1" </k> /*<k> "q1" </k>
*/ */
SELECT max(price-mins(price)) FROM stocks SELECT max(price-mins(price)) FROM stocks
SELECT price, timestamp FROM stocks where price - timestamp > 1 and not (price*timestamp<100)
/* /*
<k> "q2" </k>
SELECT price, timestamp FROM stocks where price -timestamp > 1 and not (price*timestamp<100);
<k> "q3"</k> <k> "q3"</k>
SELECT max(price-mins(price)) SELECT max(price-mins(price))
FROM stocks FROM stocks

Loading…
Cancel
Save