updated code for groupby and windowed aggregations

dev
Bill Sun 3 years ago
parent 277dad6b3e
commit 7eac7837a3

6
.gitignore vendored

@ -20,4 +20,8 @@ k
*.pdf *.pdf
test*.c* test*.c*
*.csv *.csv
*.out *.out
*.asm
!mmw.so
*.k
!header.k

@ -0,0 +1,8 @@
all:
g++ mmw.cpp --std=c++1z -shared -fPIC -Ofast -march=native -g0 -s -o mmw.so
avx512:
g++ mmw.cpp --std=c++1z -shared -fPIC -Ofast -mavx512f -g0 -s -o mmw.so
debug:
g++ mmw.cpp --std=c++1z -shared -fPIC -O0 -march=native -g3 -o mmw.so
clean:
rm mmw.so -rf

@ -4,7 +4,7 @@ from engine.utils import base62uuid
# replace column info with this later. # replace column info with this later.
class ColRef: class ColRef:
def __init__(self, k9name, _ty, cobj, cnt, table, name, id): def __init__(self, k9name, _ty, cobj, cnt, table, name, id, order = None, compound = False):
self.k9name = k9name self.k9name = k9name
self.type = _ty self.type = _ty
self.cobj = cobj self.cobj = cobj
@ -12,6 +12,9 @@ class ColRef:
self.table = table self.table = table
self.name = name self.name = name
self.id = id self.id = id
self.order = order # True -> asc, False -> dsc; None -> unordered
self.compound = compound # compound field (list as a field)
self.views = []
self.__arr__ = (k9name, _ty, cobj, cnt, table, name, id) self.__arr__ = (k9name, _ty, cobj, cnt, table, name, id)
def __getitem__(self, key): def __getitem__(self, key):
@ -31,6 +34,7 @@ class TableInfo:
self.cxt = cxt self.cxt = cxt
self.views = set() self.views = set()
self.rec = None self.rec = None
self.groupinfo = None
for c in cols: for c in cols:
self.add_col(c) self.add_col(c)
@ -44,13 +48,6 @@ class TableInfo:
if type(c) is ColRef: if type(c) is ColRef:
c = c.cobj c = c.cobj
k9name = 'c' + base62uuid(7) k9name = 'c' + base62uuid(7)
# k9name = self.table_name + c['name']
# if k9name in self.cxt.k9cols_byname: # duplicate names?
# root = self.cxt.k9cols_byname[k9name]
# k9name = k9name + root.cnt
# root.cnt += 1
# column: (k9name, type, original col_object, dup_count)
col_object = ColRef(k9name, (list(c['type'].keys()))[0], c, 1, self,c['name'], len(self.columns)) col_object = ColRef(k9name, (list(c['type'].keys()))[0], c, 1, self,c['name'], len(self.columns))
self.cxt.k9cols_byname[k9name] = col_object self.cxt.k9cols_byname[k9name] = col_object

@ -37,15 +37,18 @@ class load(ast_node):
name="load" name="load"
def produce(self, node): def produce(self, node):
node = node[self.name] node = node[self.name]
tablename = 'l'+base62uuid(7)
keys = 'k'+base62uuid(7)
self.emit(f"{tablename}:`csv ? 1:\"{node['file']['literal']}\"")
self.emit(f"{keys}:!{tablename}")
table:TableInfo = self.context.tables_byname[node['table']] table:TableInfo = self.context.tables_byname[node['table']]
n_keys = len(table.columns)
keys = ''
for _ in n_keys:
keys+='`tk'+base62uuid(6)
tablename = 'l'+base62uuid(7)
self.emit(f"{tablename}:[{keys}!+(`csv ? 1:\"{node['file']['literal']}\")][{keys}]")
for i, c in enumerate(table.columns): for i, c in enumerate(table.columns):
c:ColRef c:ColRef
self.emit(f'{c.k9name}:{tablename}[({keys})[{i}]]') self.emit(f'{c.k9name}:{tablename}[{i}]')
class outfile(ast_node): class outfile(ast_node):
name="_outfile" name="_outfile"

@ -8,17 +8,17 @@ class expr(ast_node):
'min': 'min', 'min': 'min',
'avg': 'avg', 'avg': 'avg',
'sum': 'sum', 'sum': 'sum',
'mod':'mod',
'mins': ['mins', 'minsw'], 'mins': ['mins', 'minsw'],
'maxs': ['maxs', 'maxsw'], 'maxs': ['maxs', 'maxsw'],
'avgs': ['avgs', 'avgsw'], 'avgs': ['avgs', 'avgsw'],
'sums': ['sums', 'sumsw'], 'sums': ['sums', 'sumsw'],
} }
binary_ops = { binary_ops = {
'sub':'-', 'sub':'-',
'add':'+', 'add':'+',
'mul':'*', 'mul':'*',
'div':'%', 'div':'%',
'mod':'mod',
'and':'&', 'and':'&',
'or':'|', 'or':'|',
'gt':'>', 'gt':'>',

@ -12,7 +12,7 @@ class groupby(ast_node):
if type(node) is not list: if type(node) is not list:
node = [node] node = [node]
g_contents = '(' g_contents = '('
first_col = ''
for i, g in enumerate(node): for i, g in enumerate(node):
v = g['value'] v = g['value']
e = expr(self, v).k9expr e = expr(self, v).k9expr
@ -21,7 +21,8 @@ class groupby(ast_node):
tmpcol = 't' + base62uuid(7) tmpcol = 't' + base62uuid(7)
self.emit(f'{tmpcol}:{e}') self.emit(f'{tmpcol}:{e}')
e = tmpcol e = tmpcol
if i == 0:
first_col = e
g_contents += e + (';'if i < len(node)-1 else '') g_contents += e + (';'if i < len(node)-1 else '')
self.emit(f'{self.group}:'+g_contents+')') self.emit(f'{self.group}:'+g_contents+')')
@ -29,8 +30,8 @@ class groupby(ast_node):
if len(node) <= 1: if len(node) <= 1:
self.emit(f'{self.group}:={self.group}') self.emit(f'{self.group}:={self.group}')
else: else:
self.emit(f'{self.group}:groupby[{self.group}[0];+{self.group}]') self.emit(f'{self.group}:groupby[+({self.group},(,!(#({first_col}))))]')
def consume(self, _): def consume(self, _):
self.referenced = self.datasource.rec self.referenced = self.datasource.rec
self.datasource.rec = None self.datasource.rec = None

@ -5,6 +5,8 @@ from engine.expr import expr
from engine.scan import filter from engine.scan import filter
from engine.utils import base62uuid, enlist, base62alp from engine.utils import base62uuid, enlist, base62alp
from engine.ddl import outfile from engine.ddl import outfile
import copy
class projection(ast_node): class projection(ast_node):
name='select' name='select'
def __init__(self, parent:ast_node, node, context:Context = None, outname = None, disp = True): def __init__(self, parent:ast_node, node, context:Context = None, outname = None, disp = True):
@ -62,6 +64,8 @@ class projection(ast_node):
if 'groupby' in node: if 'groupby' in node:
self.group_node = groupby(self, node['groupby']) self.group_node = groupby(self, node['groupby'])
self.datasource = copy(self.datasource) # shallow copy
self.datasource.groupinfo = self.group_node
else: else:
self.group_node = None self.group_node = None

@ -1,5 +1,7 @@
import`csv import`csv
md:{y-x*_y%x}
maxs:{[L]{max(x, y)}\L} maxs:{[L]{max(x, y)}\L}
mins:{[L]{min(x, y)}\L} mins:{[L]{min(x, y)}\L}
sums:{[L]{(x + y)}\L} sums:{[L]{(x + y)}\L}
@ -7,22 +9,56 @@ sums:{[L]{(x + y)}\L}
avgsimpl:{[L;i] curr:L[i]%(i+1); $[i<(#L)-1;curr, avgsimpl[L;i+1];curr]} avgsimpl:{[L;i] curr:L[i]%(i+1); $[i<(#L)-1;curr, avgsimpl[L;i+1];curr]}
avgs:{[L] avgsimpl[sums[L];0]} avgs:{[L] avgsimpl[sums[L];0]}
maxswimp:{[L;w;i] curr:max(L@(((i-w)+!w)|0)); $[i<#L;curr, maxswimp[L; w; i + 1];curr]} / maxswimp:{[L;w;i] curr:max(L@(((i-w)+!w)|0)); $[i<#L;curr, maxswimp[L; w; i + 1];curr]}
maxsw:{[w;L]maxswimp[L; w; 1]} / maxsw:{[w;L]maxswimp[L; w; 1]}
/ minswimp:{[L;w;i] curr:min(L@(((i-w)+!w)|0)); $[i<#L;curr, maxswimp[L; w; i + 1];curr]}
/ minsw:{[w;L]minswimp[L;w;1]}
/ avgswimp:{[L;w;s;i] s:(s+L[i])-L[i-w];curr:s%((i+1)&w);$[i<(#L)-1; curr, avgswimp[L; w; s; i+1]; curr]}
/ avgsw:{[w;L] avgswimp[L;w;0;0]}
/ sumswimp:{[L;w;s;i] s:(s+L[i])-L[i-w];$[i<(#L)-1; s, sumswimp[L; w; s; i+1]; s]}
/ sumsw:{[w;L] sumswimp[L;w;0;0]}
groupby0:{[L]
{[x;y]
x:$[(@x)=`i;(,(L[0]))!,(,0);x];
k:(,(L[y]));gvk:x[k][0];
found:$[(gvk[0]+gvk[1])>0;1;L[y] in !x];
cg:(,L[y])!$[found;,gvk[0],y;,(,y)];
(x,cg)}/!(#L)}
groupBy:{[x]groupBySingle:{[a;x]
findAll:{[c;xx]
f:{[i;c]$[(c[0])[i]~c[1];i+1;0]};
@[!#xx;!#xx;f;(#xx)#,(xx;c)]};
z:findAll[a;x];
b:(findAll[0;z]_(!(1+#z)))-1;(a;b)};
x:+x;y:?x;
@[y;!#y;groupBySingle;(#y)#,x]}
minswimp:{[L;w;i] curr:min(L@(((i-w)+!w)|0)); $[i<#L;curr, maxswimp[L; w; i + 1];curr]} groupby:{[L]
minsw:{[w;L]minswimp[L;w;1]} L:^+L;
dimy:(#(L[0]))-1;
((({[L;dim;x;y]
x:$[x~0;(,(dim#(L[0])),0);x];
curr:dim#(L[y]);
$[(dim#*x)~curr;x;((,curr,y),x)]}[L;dimy])/!(#L));(+L)[dimy]) }
avgswimp:{[L;w;s;i] s:(s+L[i])-L[i-w];curr:s%((i+1)&w);$[i<(#L)-1; curr, avgswimp[L; w; s; i+1]; curr]} lststr:{[L](+({[x;y] ($x,$y)}/L))[0]}
avgsw:{[w;L] avgswimp[L;w;0;0]} delist:{[L] $[(@L)in(`LL`LC`LG`L);delist[(,/L)];L]}
cntlist:{[L;i] $[(@L)in(`LL`LC`LG`L);cntlist[(,/L);i+1];i+1]}
sumswimp:{[L;w;s;i] s:(s+L[i])-L[i-w];$[i<(#L)-1; s, sumswimp[L; w; s; i+1]; s]} sumswkrl:{[L;w;x;y] ((x-L[y-w])+L[y])}
sumsw:{[w;L] sumswimp[L;w;0;0]} sumsw:{[L;w] $[(#L)=0;L;(sumswkrl[L;w])\@[!#L;0;L[0]]]}
avgswkrl:{[L;w;x;y] (x-(L[y-w]-L[y])%w)}
avgsw:{[L;w] $[(#L)=0;L;(avgswkrl[L;w])\@[!#L;0;L[0]]]}
groupbyi:{[L;GV;i] / minsw:{[w;L] ({[L;w;x] min(L[$[x>w;(!w) + ((x-w)+1);!(x+1)]])}[L;w])'!#L}
k:(,(L[i]));gvk:GV[k][0]; import`mmw
found:$[(gvk[0]+gvk[1])>0;1;L[i] in !GV]; minsw:{[w;L] ret:L; mmw[ret;((`g ($@ret)[0]), (#ret), w, 65536)];ret}
cg:(,L[i])!$[found;,gvk[0],i;,(,i)]; maxsw:{[w;L] ret:L; mmw[ret;((`g ($@ret)[0]), (#ret), w, 65537)];ret}
$[i<(#L)-1; groupbyi[L;(GV,cg);i+1]; (GV,cg)]} minswip:{[w;L] mmw[L;((`g ($@L)[0]), (#L), w, 65536)];}
groupbys:{[L;ll] GV1:(,(L[0]))!,(,0);$[ll>1;groupbyi[L;GV1;1];GV1]} maxswip:{[w;L] mmw[L;((`g ($@L)[0]), (#L), w, 65537)];}
groupby:{[l;L] $[(#l)=0;,();groupbys[L;#l]]}

@ -0,0 +1,48 @@
#include <cstring>
#include <cstdlib>
#include <cstdint>
#include <deque>
using std::size_t;
using std::uint32_t;
template<class T, bool minmax>
void running(void *array, uint32_t len, uint32_t w){
using std::deque;
T* arr = static_cast<T*> (array);
deque<std::pair<T, uint32_t>> cache;
for(int i = 0; i < len; ++i){
if(!cache.empty() && cache.front().second == i-w) cache.pop_front();
if constexpr(minmax)
while(!cache.empty() && cache.back().first>arr[i]) cache.pop_back();
else
while(!cache.empty() && cache.back().first<arr[i]) cache.pop_back();
cache.push_back({arr[i], i});
arr[i] = cache.front().first;
}
}
template<class T>
inline void mm(void *array, uint32_t len, uint32_t w, bool mm){
mm? running<T, true>(array, len, w) : running<T, false>(array, len, w);
}
extern "C" {
#include <stdio.h>
int mmw(void *array, unsigned long long misc[]){
char _ty = misc[0];
uint32_t len = misc[1];
uint32_t w = misc[2];
bool minmax = misc[3]-0x10000;
switch(_ty){
case 'F': mm<double>(array, len, w, minmax); break;
case 'C': case 'G': mm<unsigned char>(array, len, w, minmax); break;
case 'H': mm<unsigned short>(array, len, w, minmax); break;
case 'D': case 'I': mm<unsigned int>(array, len, w, minmax); break;
case 'T': case 'J': mm<long long>(array, len, w, minmax); break;
case 'L': if(len == 0) break;
default: printf("nyi %c\n", _ty);
}
return 0;
}
}

BIN
mmw.so

Binary file not shown.

@ -7,9 +7,6 @@ import sys
if sys.platform != 'win32': if sys.platform != 'win32':
import readline import readline
# else:
# import pyreadline3
test_parser = True test_parser = True
# code to test parser # code to test parser

Loading…
Cancel
Save