simple udf code generation

dev
Bill 2 years ago
parent ece2a6cc9f
commit 5b1bbf0f99

5
.gitignore vendored

@ -18,6 +18,9 @@ out.k
k
*.so
*.pdf
**/*.cmake
**/Debug
**/Release
test*.c*
*.csv
!test.csv
@ -38,3 +41,5 @@ test*.c*
*.shm
server/packages/**
*.ipynb
*.cmake
*.stackdump

@ -4,9 +4,14 @@ MonetDB_LIB =
ifeq ($(OS),Windows_NT)
OS_SUPPORT += server/winhelper.cpp
MonetDB_LIB += -Imonetdb/msvc msc-plugin/monetdbe.dll
else
MonetDB_LIB += -lmonetdbe
endif
$(info $(OS_SUPPORT))
info:
$(info $(OS_SUPPORT))
$(info $(OS))
$(info "test")
server.bin:
$(CXX) server/server.cpp $(OS_SUPPORT) --std=c++1z -O3 -march=native -o server.bin
server.so:

@ -0,0 +1,222 @@
{
"stmts": {
"udf": {
"fname": "covariances2",
"params": ["x", "y", "w"],
"stmt": [{
"assignment": {
"var": "xmeans",
"op": ":=",
"expr": 0.0
}
}, {
"assignment": {
"var": "ymeans",
"op": ":=",
"expr": 0.0
}
}, {
"assignment": {
"var": "l",
"op": ":=",
"expr": "_builtin_len"
}
}, {
"if": {
"cond": {
"gt": ["w", "l"]
},
"assignment": {
"var": "w",
"op": ":=",
"expr": "l"
},
"elif": [{
"cond": {
"gt": ["w", {
"add": ["l", 2]
}]
},
"stmt": [{
"assignment": {
"var": "l",
"op": ":=",
"expr": 3
}
}, {
"assignment": {
"var": "w",
"op": ":=",
"expr": 4
}
}]
}, {
"cond": {
"lt": ["w", 99]
},
"stmt": {
"assignment": {
"var": "l",
"op": ":=",
"expr": 8
}
}
}, {
"cond": {
"lt": ["w", 999]
},
"assignment": {
"var": "w",
"op": ":=",
"expr": 6
}
}],
"else": {
"assignment": {
"var": "l",
"op": ":=",
"expr": {
"div": ["l", 2]
}
}
}
}
}, {
"for": {
"defs": {
"var": ["i", "j"],
"op": [":=", ":="],
"expr": [0, 0]
},
"cond": {
"lt": ["i", "w"]
},
"tail": {
"var": "i",
"op": ":=",
"expr": {
"add": ["i", 1]
}
},
"stmt": [{
"assignment": {
"var": "xmeans",
"op": "+=",
"expr": {
"get": ["x", "i"]
}
}
}, {
"assignment": {
"var": "ymeans",
"op": "+=",
"expr": {
"get": ["y", "i"]
}
}
}, {
"assignment": {
"var": {
"get": ["_builtin_ret", "i"]
},
"op": ":=",
"expr": {
"avg": {
"mul": [{
"sub": [{
"x": [{
"sub": ["l", "w"]
}, "l"]
}, "xmeans"]
}, {
"sub": [{
"y": [{
"sub": ["l", "w"]
}, "l"]
}, "ymeans"]
}]
}
}
}
}]
}
}, {
"for": {
"defs": {
"var": "i",
"op": ":=",
"expr": 0
},
"cond": {
"lt": ["i", "l"]
},
"tail": {
"var": "i",
"op": "+=",
"expr": 1
},
"stmt": [{
"assignment": {
"var": "xmeans",
"op": "+=",
"expr": {
"div": [{
"sub": [{
"get": ["x", "i"]
}, {
"get": ["x", {
"sub": ["i", "w"]
}]
}]
}, "w"]
}
}
}, {
"assignment": {
"var": "ymeans",
"op": "+=",
"expr": {
"div": [{
"sub": [{
"get": ["y", "i"]
}, {
"get": ["y", {
"sub": ["i", "w"]
}]
}]
}, "w"]
}
}
}, {
"assignment": {
"var": {
"get": ["_builtin_ret", "i"]
},
"op": ":=",
"expr": {
"avg": {
"mul": [{
"sub": [{
"x": [{
"sub": ["l", "w"]
}, "l"]
}, "xmeans"]
}, {
"sub": [{
"y": [{
"sub": ["l", "w"]
}, "l"]
}, "ymeans"]
}]
}
}
}
}]
}
}],
"ret": {
"null": {}
}
}
}
}

@ -4,6 +4,8 @@ import os
# os.environ['CXX'] = 'C:/Program Files/LLVM/bin/clang.exe'
add_path_to_ldpath = True
rebuild_backend = False
run_backend = False
os_platform = 'unkown'

@ -34,6 +34,7 @@ FETCH = keyword("fetch").suppress()
FROM = keyword("from").suppress()
FULL = keyword("full")
FUNCTION = keyword("function").suppress()
AGGREGATION = keyword("aggregation").suppress()
GROUP = keyword("group").suppress()
HAVING = keyword("having").suppress()
INNER = keyword("inner")
@ -73,6 +74,11 @@ RECURSIVE = keyword("recursive").suppress()
VALUES = keyword("values").suppress()
WINDOW = keyword("window")
INTO = keyword("into").suppress()
IF = keyword("if").suppress()
STATIC = keyword("static").suppress()
ELIF = keyword("elif").suppress()
ELSE = keyword("else").suppress()
FOR = keyword("for").suppress()
PRIMARY_KEY = Group(PRIMARY + KEY).set_parser_name("primary_key")
FOREIGN_KEY = Group(FOREIGN + KEY).set_parser_name("foreign_key")
@ -112,8 +118,14 @@ INDF = (
keyword("is not distinct from").set_parser_name("ne!")
)
FASSIGN = Literal(":=").set_parser_name("fassign") # Assignment in UDFs
PASSIGN = Literal("+=").set_parser_name("passign")
MASSIGN = Literal("-=").set_parser_name("massign")
MULASSIGN = Literal("*=").set_parser_name("mulassign")
DASSIGN = Literal("/=").set_parser_name("dassign")
COLON = Literal(":").set_parser_name("colon")
NEQ = (Literal("!=") | Literal("<>")).set_parser_name("neq")
LAMBDA = Literal("->").set_parser_name("lambda")
DOT = Literal(".").set_parser_name("dot")
AND = keyword("and")
BETWEEN = keyword("between")
@ -233,6 +245,8 @@ L_INLINE = Literal("<k>").suppress()
R_INLINE = Literal("</k>").suppress()
LBRACE = Literal("{").suppress()
RBRACE = Literal("}").suppress()
LSB = Literal("[").suppress()
RSB = Literal("]").suppress()
LB = Literal("(").suppress()
RB = Literal(")").suppress()
EQ = Char("=").suppress()

@ -235,6 +235,8 @@ def parser(literal_string, ident, sqlserver=False):
scale_function = ((real_num | int_num) + call_function) / scale
scale_ident = ((real_num | int_num) + ident) / scale
compound = (
NULL
| TRUE
@ -330,12 +332,39 @@ def parser(literal_string, ident, sqlserver=False):
)
) / to_join_call
fassign = Group(var_name("var") + Suppress(FASSIGN) + expr("expr") + Suppress(";"))("assignment")
fassigns = fassign + ZeroOrMore(fassign, Whitespace(white=" \t"))
fbody = (Optional(fassigns) + expr("ret"))
definable_name = Forward()
dindex = definable_name("l") + LSB + expr("idx") + RSB
definable_name << var_name | dindex
# lname = Forward()
# ptr = (lname("l") + LAMBDA + var_name("r"))
# member = (lname("l") + DOT + var_name("r"))
# idx = (expr | COLON)
# index = (lname("l") + LSB + expr("lidx") + "," + idx("ridx") + RSB)
# lname << var_name | ptr | member | index
assignment = expr("var") + (FASSIGN|PASSIGN|MASSIGN|MULASSIGN|DASSIGN)("op") + expr("expr")
declaration = definable_name("var") + Optional(Suppress(FASSIGN) + expr("expr"))
fassign = Group(assignment + Suppress(";"))("assignment")
static_decl = Group(STATIC + delimited_list(declaration))("static_decl")
stmt = Forward()
elifstmt = Group(ELIF + LB + expr("cond") + RB + stmt)("elif")
elsestmt = Group(ELSE + stmt)("else")
ifstmt = Group(IF + LB + expr("cond") + RB + stmt +
ZeroOrMore(elifstmt) + Optional(elsestmt))("if")
forstmt = Group(FOR + LB + ( delimited_list(assignment)("defs")
+ Suppress(";") + expr("cond") +
Suppress(";") + delimited_list(assignment)("tail"))
+ RB + stmt)("for")
block = Forward()
stmt << (fassign|ifstmt|forstmt|block|Suppress(";"))
stmts = (ZeroOrMore(stmt("stmt"), Whitespace()))
block << (LBRACE + Optional(stmts) + RBRACE)("code_block")
fbody = (Optional(static_decl) + Optional(stmts) + expr("ret"))
udf = (
Optional(AGGREGATION("Agg")) +
FUNCTION
+ var_name("fname")
+ LB

@ -79,6 +79,10 @@ LongT = Types(4, name = 'int64', sqlname = 'BIGINT', fp_type = DoubleT)
ByteT = Types(1, name = 'int8', sqlname = 'TINYINT', long_type=LongT, fp_type=FloatT)
ShortT = Types(2, name = 'int16', sqlname='SMALLINT', long_type=LongT, fp_type=FloatT)
IntT = Types(3, name = 'int', cname = 'int', long_type=LongT, fp_type=FloatT)
ULongT = Types(8, name = 'uint64', sqlname = 'UINT64', fp_type=DoubleT)
UIntT = Types(7, name = 'uint32', sqlname = 'UINT32', long_type=ULongT, fp_type=FloatT)
UShortT = Types(6, name = 'uint16', sqlname = 'UINT16', long_type=ULongT, fp_type=FloatT)
UByteT = Types(5, name = 'uint8', sqlname = 'UINT8', long_type=ULongT, fp_type=FloatT)
StrT = Types(200, name = 'str', cname = 'const char*', sqlname='VARCHAR', ctype_name = 'types::STRING')
def _ty_make_dict(fn : str, *ty : Types):
return {eval(fn):t for t in ty}
@ -214,6 +218,11 @@ spnull = OperatorBase('missing', 1, logical, cname = "", sqlname = "", call = is
# cstdlib
fnsqrt = OperatorBase('sqrt', 1, lambda *_ : DoubleT, cname = 'sqrt', sqlname = 'SQRT', call = fn_behavior)
fnlog = OperatorBase('log', 2, lambda *_ : DoubleT, cname = 'log', sqlname = 'LOG', call = fn_behavior)
fnsin = OperatorBase('sin', 1, lambda *_ : DoubleT, cname = 'sin', sqlname = 'SIN', call = fn_behavior)
fncos = OperatorBase('cos', 1, lambda *_ : DoubleT, cname = 'cos', sqlname = 'COS', call = fn_behavior)
fntan = OperatorBase('tan', 1, lambda *_ : DoubleT, cname = 'tan', sqlname = 'TAN', call = fn_behavior)
fnpow = OperatorBase('pow', 2, lambda *_ : DoubleT, cname = 'pow', sqlname = 'POW', call = fn_behavior)
# type collections
def _op_make_dict(*items : OperatorBase):
@ -223,8 +232,8 @@ builtin_binary_logical = _op_make_dict(opand, opor, opxor, opgt, oplt, opge, opl
builtin_unary_logical = _op_make_dict(opnot)
builtin_unary_arith = _op_make_dict(opneg)
builtin_unary_special = _op_make_dict(spnull)
builtin_cstdlib = _op_make_dict(fnsqrt)
builtin_cstdlib = _op_make_dict(fnsqrt, fnlog, fnsin, fncos, fntan, fnpow)
builtin_func = _op_make_dict(fnmax, fnmin, fnsum, fnavg, fnmaxs, fnmins, fnsums, fnavgs, fncnt)
builtin_operators : dict[str, OperatorBase] = {**builtin_binary_arith, **builtin_binary_logical,
builtin_operators : Dict[str, OperatorBase] = {**builtin_binary_arith, **builtin_binary_logical,
**builtin_unary_arith, **builtin_unary_logical, **builtin_unary_special, **builtin_func, **builtin_cstdlib}

@ -1,19 +1,23 @@
import uuid
base62alp = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
lower_alp = 'abcdefghijklmnopqrstuvwxyz'
upper_alp = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
nums = '0123456789'
base62alp = nums + lower_alp + upper_alp
reserved_monet = ['month']
def base62uuid(crop=8):
id = uuid.uuid4().int
_id = uuid.uuid4().int
ret = ''
while id:
ret = base62alp[id % 62] + ret
id //= 62
while _id:
ret = base62alp[_id % 62] + ret
_id //= 62
return ret[:crop] if len(ret) else '0'
def get_leagl_name(name, lower = True):
def get_legal_name(name, lower = True):
if name is not None:
if lower:
name = name.lower()
@ -26,7 +30,7 @@ def get_leagl_name(name, lower = True):
return name
def check_leagl_name(name):
def check_legal_name(name):
all_underscores = True
for c in name:
if c not in base62alp and c != '_':
@ -54,3 +58,16 @@ def has_other(a, b):
def defval(val, default):
return default if val is None else val
# escape must be readonly
from typing import Set
def remove_last(pattern : str, string : str, escape : Set[str] = set()) -> str:
idx = string.rfind(pattern)
if idx == -1:
return string
else:
if set(string[idx:]).difference(escape):
return string
else:
return string[:idx] + string[idx+1:]

@ -4,10 +4,21 @@ ymean := avg (y) ;
avg (( x - xmean ) * (y - ymean ))
}
FUNCTION sd ( x) {
sqrt ( covariance (x , x) )
}
AGGREGATION FUNCTION covariances(x, y, w){
static xmeans := 0, ymeans := 0, cnt := 0;
if (cnt < w) { xmeans += x; }
else {
xmeans += (x - x.vec[cnt - w]) / w;
ymeans += (y - y.vec[cnt - w]) / w;
}
avg (( x.vec(x.len-w, x.len) - xmean ) * (y.vec(y.len - w, y.len) - ymean ))
}
FUNCTION pairCorr (x , y ) {
covariance (x , y ) / ( sd (x) * sd (y ))
}

@ -1,98 +0,0 @@
# Generated by CMake
if("${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION}" LESS 2.6)
message(FATAL_ERROR "CMake >= 2.6.0 required")
endif()
cmake_policy(PUSH)
cmake_policy(VERSION 2.6...3.21)
#----------------------------------------------------------------
# Generated CMake target import file.
#----------------------------------------------------------------
# Commands may need to know the format version.
set(CMAKE_IMPORT_FILE_VERSION 1)
# Protect against multiple inclusion, which would fail when already imported targets are added once more.
set(_targetsDefined)
set(_targetsNotDefined)
set(_expectedTargets)
foreach(_expectedTarget MonetDB::monetdb_config_header)
list(APPEND _expectedTargets ${_expectedTarget})
if(NOT TARGET ${_expectedTarget})
list(APPEND _targetsNotDefined ${_expectedTarget})
endif()
if(TARGET ${_expectedTarget})
list(APPEND _targetsDefined ${_expectedTarget})
endif()
endforeach()
if("${_targetsDefined}" STREQUAL "${_expectedTargets}")
unset(_targetsDefined)
unset(_targetsNotDefined)
unset(_expectedTargets)
set(CMAKE_IMPORT_FILE_VERSION)
cmake_policy(POP)
return()
endif()
if(NOT "${_targetsDefined}" STREQUAL "")
message(FATAL_ERROR "Some (but not all) targets in this export set were already defined.\nTargets Defined: ${_targetsDefined}\nTargets not yet defined: ${_targetsNotDefined}\n")
endif()
unset(_targetsDefined)
unset(_targetsNotDefined)
unset(_expectedTargets)
# Compute the installation prefix relative to this file.
get_filename_component(_IMPORT_PREFIX "${CMAKE_CURRENT_LIST_FILE}" PATH)
get_filename_component(_IMPORT_PREFIX "${_IMPORT_PREFIX}" PATH)
get_filename_component(_IMPORT_PREFIX "${_IMPORT_PREFIX}" PATH)
get_filename_component(_IMPORT_PREFIX "${_IMPORT_PREFIX}" PATH)
if(_IMPORT_PREFIX STREQUAL "/")
set(_IMPORT_PREFIX "")
endif()
# Create imported target MonetDB::monetdb_config_header
add_library(MonetDB::monetdb_config_header INTERFACE IMPORTED)
set_target_properties(MonetDB::monetdb_config_header PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES "${_IMPORT_PREFIX}/include/monetdb"
)
if(CMAKE_VERSION VERSION_LESS 3.0.0)
message(FATAL_ERROR "This file relies on consumers using CMake 3.0.0 or greater.")
endif()
# Load information for each installed configuration.
get_filename_component(_DIR "${CMAKE_CURRENT_LIST_FILE}" PATH)
file(GLOB CONFIG_FILES "${_DIR}/monetdb_config_headerTargets-*.cmake")
foreach(f ${CONFIG_FILES})
include(${f})
endforeach()
# Cleanup temporary variables.
set(_IMPORT_PREFIX)
# Loop over all imported files and verify that they actually exist
foreach(target ${_IMPORT_CHECK_TARGETS} )
foreach(file ${_IMPORT_CHECK_FILES_FOR_${target}} )
if(NOT EXISTS "${file}" )
message(FATAL_ERROR "The imported target \"${target}\" references the file
\"${file}\"
but this file does not exist. Possible reasons include:
* The file was deleted, renamed, or moved to another location.
* An install or uninstall procedure did not complete successfully.
* The installation package was faulty and contained
\"${CMAKE_CURRENT_LIST_FILE}\"
but not all the files it references.
")
endif()
endforeach()
unset(_IMPORT_CHECK_FILES_FOR_${target})
endforeach()
unset(_IMPORT_CHECK_TARGETS)
# This file does not depend on other imported targets which have
# been exported from the same project but in a separate export set.
# Commands beyond this point should not need to know the version.
set(CMAKE_IMPORT_FILE_VERSION)
cmake_policy(POP)

@ -0,0 +1,4 @@
LOAD MODULE FROM "test.so"
FUNCTIONS (div(a:int, b:int) -> double,
mulvec(a:int, b:vecfloat) -> vecfloat
);

Binary file not shown.

@ -1,21 +1,31 @@
#include "./server/libaquery.h"
#include "./server/aggregations.h"
#include "./udf.hpp"
#include "./server/hasher.h"
#include <unordered_map>
#include "./server/monetdb_conn.h"
#include "./server/aggregations.h"
#include "./server/libaquery.h"
extern "C" int __DLLEXPORT__ dllmain(Context* cxt) {
using namespace std;
using namespace types;
auto server = static_cast<Server*>(cxt->alt_server);
auto len_6WMRXO = server->cnt;
auto suma_6BP = ColRef<int64_t>(len_6WMRXO, server->getCol(0));
auto b_5Yb = ColRef<int>(len_6WMRXO, server->getCol(1));
auto c_2Vh = ColRef<int>(len_6WMRXO, server->getCol(2));
auto d_1Ma = ColRef<int>(len_6WMRXO, server->getCol(3));
auto out_2URo7p = new TableInfo<value_type<decays<decltype((paircorr(c_2Vh, b_5Yb) * d_1Ma))>>,int64_t,int>("out_2URo7p");
out_2URo7p->get_col<0>() = (paircorr(c_2Vh, b_5Yb) * d_1Ma);
out_2URo7p->get_col<1>().initfrom(suma_6BP);
out_2URo7p->get_col<2>().initfrom(b_5Yb);
print(*out_2URo7p);
auto len_4fxytV = server->cnt;
auto b_3pr = ColRef<int>(len_4fxytV, server->getCol(0));
auto a_65L = ColRef<int>(len_4fxytV, server->getCol(1));
auto out_2UnEpP = new TableInfo<value_type<decays<decltype(covariances2_gettype(a_65L, b_3pr, 4))>>>("out_2UnEpP");
auto col_C9QF0Z = out_2UnEpP->get_col<0>();
for (uint32_t i41 = 0; i41 < t1K4f4I0.size; ++i41){
g1zdpLFa[forward_as_tuple(t1K4f4I0[i41])].emplace_back(i41);
}
for (const auto& i40 : g1zdpLFa){
auto &len_5NTOM6m = val_2423Z8E.size;
auto &key_6fZPUDS = i4O.first;
auto &val_2423Z8E = i4O.second;
col_C9QF0Z.emplace_back({len_5NTOM6m});
covariances2(a_65L[val_2423Z8E], b_3pr[val_2423Z8E], 4, len_5NTOM6m, col_C9QF0Z.back());
}
print(*out_2UnEpP);
return 0;
}

@ -34,13 +34,17 @@ server_mode = RunType.Threaded
server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so'
nullstream = open(os.devnull, 'w')
if aquery_config.rebuild_backend:
try:
os.remove(server_bin)
except Exception as e:
print(type(e), e)
nullstream = open(os.devnull, 'w')
subprocess.call(['make', server_bin], stdout=nullstream)
cleanup = True
def rm():
@ -157,7 +161,7 @@ def init_threaded():
else:
os.environ['PATH'] = os.environ['PATH'] + os.pathsep + os.path.abspath('.')
os.environ['PATH'] = os.environ['PATH'] + os.pathsep + os.path.abspath('./lib')
if aquery_config.run_backend:
server_so = ctypes.CDLL('./'+server_bin)
global cfg, th, send
send = server_so['receive_args']
@ -180,8 +184,11 @@ else:
global cfg
cfg.new_query = 1
set_ready = __set_ready
get_ready = lambda:cfg.new_query
get_ready = lambda: aquery_config.run_backend and cfg.new_query
if aquery_config.run_backend:
server_status = lambda : not th.is_alive()
else:
server_status = lambda : True
init()
test_parser = True
@ -221,7 +228,10 @@ while test_parser:
qs = [ctypes.c_char_p(bytes(q, 'utf-8')) for q in sqls if len(q)]
sz = len(qs)
payload = (ctypes.c_char_p*sz)(*qs)
try:
send(sz, payload)
except TypeError as e:
print(e)
if cxt.udf is not None:
with open('udf.hpp', 'wb') as outfile:
outfile.write(cxt.udf.encode('utf-8'))
@ -237,19 +247,18 @@ while test_parser:
continue
elif q == 'dbg':
import code
var = globals().copy()
var.update(locals())
from copy import deepcopy
var = {**globals(), **locals()}
sh = code.InteractiveConsole(var)
try:
code.interact()
sh.interact(banner = 'debugging session began.', exitmsg = 'debugging session ended.')
except BaseException as e:
# don't care about anything happened in interactive console
pass
print(e.with_traceback())
elif q.startswith('log'):
qs = re.split(' |\t', q)
qs = re.split(r'[ \t]', q)
if len(qs) > 1:
cxt.log_level = qs[1]
else:
@ -276,7 +285,7 @@ while test_parser:
set_ready()
continue
elif q.startswith('save'):
filename = re.split(' |\t', q)
filename = re.split(r'[ \t]', q)
if (len(filename) > 1):
filename = filename[1]
else:

@ -1,32 +0,0 @@
Exception: STATUS_ACCESS_VIOLATION at rip=005A43E1EE8
rax=000000080016C970 rbx=00000000FFFFC2E0 rcx=0000000000000015
rdx=00000000FFFFCE00 rsi=0000000000000000 rdi=0000000000000002
r8 =00000000000000A8 r9 =000000080016C970 r10=0000000000005150
r11=000000080016C970 r12=00000000FFFFFFFF r13=0000000000000000
r14=00006FFFFFEBA2F8 r15=00000000FFFFC380
rbp=00000000FFFFC210 rsp=00000000FFFFC160
program=C:\msys64\usr\bin\python3.exe, pid 821, thread main
cs=0033 ds=002B es=002B fs=0053 gs=002B ss=002B
Stack trace:
Frame Function Args
000FFFFC210 005A43E1EE8 (00000000000, 0057B699D00, 00000000050, 000FFFFC2A0)
000FFFFC210 00487B249A1 (0000000000A, 000FFFFC2F0, 00000000002, 000FFFFC3A0)
000FFFFC2A0 00487B245EE (000FFFC68F0, 00000000002, 6FFFFFEBA2F8, 00000000000)
000FFFFC3A0 00487B247B2 (004484CACEA, 00800187A50, 004484CACDA, 000FFFFC2D0)
000FFFFC3A0 004484CAE74 (0057B4D31A2, 00500001101, 00000000000, 00800187A50)
00000000000 004484C476D (6FFFFFF97DF0, 00000000000, 0057B539C7B, 6FFFFFE9AF40)
004484C4660 0057B509F6F (6FFFFFFC68F0, 6FFFFFEA2330, 0057B547BA7, 00000000000)
00800048F80 0057B6910D9 (0080016ABE0, 6FFF00000000, 0057B68A0EA, 0057B72B2E0)
00000000000 0057B5D0C61 (6FFFFFE9F190, 000FFFFC7F0, 0057B5E3A05, 00000000000)
6FFFFFF30220 0057B5D0FD1 (0080016ABF0, 6FFFFFF93D30, 0057B618338, 6FFFFFF34D00)
6FFFFFF30220 0057B60C6B0 (6FFFFFE9A0B0, 0080004A920, 6FFFFFE9F190, 6FFFFFEE2610)
6FFFFFF30220 0057B60C8B4 (0057B50A49A, 00000000000, 0057B539B2F, 000FFFFCAB8)
6FFFFFF30220 0057B60EE21 (00000000000, 000FFFFCAB8, 00000000001, 0057B731792)
0057B699678 0057B60F0B6 (0057B618804, 000FFFFCD30, 0057B604F54, 00800049350)
000FFFFCD30 0057B60FA20 (00000000000, 000FFFFC968, 000FFFFCD30, 00000000001)
000FFFFCD30 0057B62ABDD (001004016D0, 001004016D0, 00000000000, 000FFFFCD30)
000FFFFCD30 0057B62AE9D (00180049B25, 00180048A40, 00000000002, 00180326FE0)
000FFFFCD30 00180049B91 (00000000000, 00000000000, 00000000000, 00000000000)
000FFFFFFF0 00180047716 (00000000000, 00000000000, 00000000000, 00000000000)
000FFFFFFF0 001800477C4 (00000000000, 00000000000, 00000000000, 00000000000)
End of stack trace

@ -0,0 +1,11 @@
# TODO:
## 1. double scans in projections
- first for special aggrigations and singular columns
- Then in group by node decide if we have special group by aggregations
- If sp_gb_agg exists, the entire groupby aggregation is done in C plugin
- If not, group by is done in SQL
## 2. ColRef supports multiple objects
- A.a = B.b then in projection A.a B.b will refer to same projection
- Colref::ProjEq(ColRef v) => this == v or v in this.proj_eqs

@ -1,7 +1,9 @@
from re import T
from typing import Set, Tuple
from copy import deepcopy
from dataclasses import dataclass
from enum import Enum, auto
from typing import Set, Tuple, Dict, Union, List, Optional
from engine.types import *
from engine.utils import enlist, base62uuid, base62alp, get_leagl_name
from engine.utils import enlist, base62uuid, base62alp, get_legal_name
from reconstruct.storage import Context, TableInfo, ColRef
class ast_node:
@ -9,10 +11,13 @@ class ast_node:
types = dict()
first_order = False
def __init__(self, parent:"ast_node", node, context:Context = None):
def __init__(self, parent:Optional["ast_node"], node, context:Optional[Context] = None):
self.context = parent.context if context is None else context
self.parent = parent
self.sql = ''
if hasattr(parent, 'datasource'):
self.datasource = parent.datasource
else:
self.datasource = None
self.init(node)
self.produce(node)
@ -38,7 +43,7 @@ class ast_node:
self.emit(self.sql+';\n')
from reconstruct.expr import expr
from reconstruct.expr import expr, fastscan
class projection(ast_node):
@ -70,10 +75,6 @@ class projection(ast_node):
else:
self.where = None
if 'groupby' in node:
self.group_node = groupby(self, node['groupby'])
else:
self.group_node = None
def consume(self, node):
# deal with projections
@ -82,9 +83,9 @@ class projection(ast_node):
col_ext : Set[ColRef]= set()
col_exprs : List[Tuple[str, Types]] = []
proj_map = dict()
proj_map : Dict[int, List[Union[Types, int, str, expr]]]= dict()
var_table = dict()
self.sp_refs = set()
for i, proj in enumerate(self.projections):
compound = False
self.datasource.rec = set()
@ -102,25 +103,26 @@ class projection(ast_node):
name = eval('f\'' + name + '\'')
if name not in var_table:
var_table[name] = len(col_exprs)
proj_map[i] = [this_type, len(col_exprs)]
proj_map[i] = [this_type, len(col_exprs), proj_expr]
col_exprs.append((name, proj_expr.type))
else:
self.context.headers.add('"./server/aggregations.h"')
if self.datasource.rec is not None:
col_ext = col_ext.union(self.datasource.rec)
proj_map[i] = [this_type, proj_expr.sql]
col_ext = col_ext.union(self.datasource.rec) # TODO: make this one var?
self.sp_refs = self.sp_refs.union(self.datasource.rec)
proj_map[i] = [this_type, proj_expr.sql, proj_expr]
if 'name' in proj: # renaming column by AS keyword
name += ' AS ' + proj['name']
if not proj_expr.is_special:
var_table[proj['name']] = len(col_exprs)
disp_name = get_leagl_name(name)
disp_name = get_legal_name(name)
elif type(proj) is str:
col = self.datasource.get_col(proj)
this_type = col.type
name = col.name
# name = col.name
self.datasource.rec = None
# TODO: Type deduction in Python
cols.append(ColRef(this_type, self.out_table, None, disp_name, i, compound=compound))
@ -133,13 +135,17 @@ class projection(ast_node):
if col not in var_table:
var_table[col] = i + _base_offset
def finialize(astnode:ast_node):
if(astnode is not None):
self.add(astnode.sql)
self.add('FROM')
finialize(self.datasource)
finialize(self.where)
finialize(self.group_node)
if 'groupby' in node:
self.group_node = groupby(self, node['groupby'])
else:
self.group_node = None
if 'orderby' in node:
self.add(orderby(self, node['orderby']).sql)
if 'outfile' in node:
@ -149,6 +155,8 @@ class projection(ast_node):
else:
# TODO: subquery, name create tmp-table from subquery w/ alias as name
pass
# cpp module codegen
self.context.has_dll = True
# extract typed-columns from result-set
@ -159,26 +167,42 @@ class projection(ast_node):
self.context.emitc(f'auto {length_name} = server->cnt;')
for v, idx in var_table.items():
vname = get_leagl_name(v) + '_' + base62uuid(3)
vname = get_legal_name(v) + '_' + base62uuid(3)
pyname2cname[v] = vname
self.context.emitc(f'auto {vname} = ColRef<{typenames[idx].cname}>({length_name}, server->getCol({idx}));')
vid2cname[idx] = vname
# Create table into context
outtable_name = 'out_' + base62uuid(6)
out_typenames = [None] * len(proj_map)
for key, val in proj_map.items():
if type(val[1]) is str:
x = True
y = lambda t: pyname2cname[t]
val[1] = eval('f\'' + val[1] + '\'')
val[1] = val[2].eval(x, y, gettype=True)
if callable(val[1]):
val[1] = val[1](True)
decltypestring = val[1]
if val[0] == LazyT:
out_typenames[key] = f'value_type<decays<decltype({val[1]})>>'
out_typenames[key] = f'value_type<decays<decltype({decltypestring})>>'
else:
out_typenames[key] = val[0].cname
# out_typenames = [v[0].cname for v in proj_map.values()]
self.context.emitc(f'auto {outtable_name} = new TableInfo<{",".join(out_typenames)}>("{outtable_name}");')
# TODO: Inject custom group by code here and flag them in proj_map
# Type of UDFs? Complex UDFs, ones with static vars?
if self.group_node is not None:
gb_vartable : Dict[str, Union[str, int]] = deepcopy(pyname2cname)
gb_cexprs : List[str] = []
for key, val in proj_map.items():
col_name = 'col_' + base62uuid(6)
self.context.emitc(f'auto {col_name} = {outtable_name}->get_col<{key}>();')
gb_cexprs.append((col_name, val[2]))
self.group_node.finalize(gb_cexprs, gb_vartable)
else:
for key, val in proj_map.items():
if type(val[1]) is int:
self.context.emitc(f'{outtable_name}->get_col<{key}>().initfrom({vid2cname[val[1]]});')
@ -194,9 +218,8 @@ class orderby(ast_node):
if node is None:
self.sql = ''
return
elif type(node) is not list:
node = [node]
node = enlist(node)
o_list = []
for o in node:
@ -207,9 +230,170 @@ class orderby(ast_node):
self.add(', '.join(o_list))
class groupby(orderby):
class scan(ast_node):
name = 'scan'
def __init__(self, parent: "ast_node", node, loop_style = 'for', context: Context = None, const = False):
self.const = "const " if const else ""
self.loop_style = loop_style
super().__init__(parent, node, context)
def init(self, _):
self.datasource = self.context.datasource
self.initializers = ''
self.start = ''
self.front = ''
self.body = ''
self.end = '}'
scan_vars = set(s.it_var for s in self.context.scans)
self.it_ver = 'i' + base62uuid(2)
while(self.it_ver in scan_vars):
self.it_ver = 'i' + base62uuid(6)
self.parent.context.scans.append(self)
def produce(self, node):
if self.loop_style == 'for_each':
self.colref = node
self.start += f'for ({self.const}auto& {self.it_ver} : {node.cobj}) {{\n'
else:
self.start += f"for (uint32_t {self.it_ver} = 0; {self.it_ver} < {node}; ++{self.it_ver}){{\n"
def add(self, stmt, position = "body"):
if position == "body":
self.body += stmt + '\n'
elif position == "init":
self.initializers += stmt + '\n'
else:
self.front += stmt + '\n'
def finalize(self):
self.context.remove_scan(self, self.initializers + self.start + self.front + self.body + self.end)
class groupby_c(ast_node):
name = '_groupby'
def produce(self, node : List[Tuple[expr, Set[ColRef]]]):
self.context.headers.add('"./server/hasher.h"')
self.context.headers.add('unordered_map')
self.group = 'g' + base62uuid(7)
self.group_type = 'record_type' + base62uuid(7)
self.datasource = self.parent.datasource
self.scanner = None
self.datasource.rec = set()
g_contents = ''
g_contents_list = []
first_col = ''
for g in node:
e = g[0]
g_str = e.eval(c_code = True)
# if v is compound expr, create tmp cols
if e.is_ColExpr:
tmpcol = 't' + base62uuid(7)
self.emit(f'auto {tmpcol} = {g_str};')
e = tmpcol
g_contents_list.append(e)
first_col = g_contents_list[0]
g_contents_decltype = [f'decays<decltype({c})>' for c in g_contents_list]
g_contents = ','.join(g_contents_list)
self.emit(f'typedef record<{",".join(g_contents_decltype)}> {self.group_type};')
self.emit(f'unordered_map<{self.group_type}, vector_type<uint32_t>, '
f'transTypes<{self.group_type}, hasher>> {self.group};')
self.n_grps = len(node)
self.scanner = scan(self, first_col + '.size')
self.scanner.add(f'{self.group}[forward_as_tuple({g_contents}[{self.scanner.it_ver}])].emplace_back({self.scanner.it_ver});')
def consume(self, _):
self.scanner.finalize()
# def deal_with_assumptions(self, assumption:assumption, out:TableInfo):
# gscanner = scan(self, self.group)
# val_var = 'val_'+base62uuid(7)
# gscanner.add(f'auto &{val_var} = {gscanner.it_ver}.second;')
# gscanner.add(f'{self.datasource.cxt_name}->order_by<{assumption.result()}>(&{val_var});')
# gscanner.finalize()
def finalize(self, cexprs : List[Tuple[str, expr]], var_table : Dict[str, Union[str, int]]):
gscanner = scan(self, self.group)
key_var = 'key_'+base62uuid(7)
val_var = 'val_'+base62uuid(7)
gscanner.add(f'auto &{key_var} = {gscanner.it_ver}.first;')
gscanner.add(f'auto &{val_var} = {gscanner.it_ver}.second;')
len_var = None
def define_len_var():
nonlocal len_var
if len_var is None:
len_var = 'len_'+base62uuid(7)
gscanner.add(f'auto &{len_var} = {val_var}.size;', position = 'front')
def get_var_names (varname : str):
var = var_table[varname]
if type(var) is str:
return f'{var}[{val_var}]'
else:
return f'get<{var}>({key_var})'
for ce in cexprs:
ex = ce[1]
materialize_builtin = {}
if type(ex.udf) is udf:
if '_builtin_len' in ex.udf.builtin_used:
define_len_var()
materialize_builtin['_builtin_len'] = len_var
if '_builtin_ret' in ex.udf.builtin_used:
define_len_var()
gscanner.add(f'{ce[0]}.emplace_back({{{len_var}}});\n')
materialize_builtin['_builtin_ret'] = f'{ce[0]}.back()'
gscanner.add(f'{ex.eval(c_code = True, y=get_var_names, materialize_builtin = materialize_builtin)};\n')
continue
gscanner.add(f'{ce[0]}.emplace_back({ex.eval(c_code = True, y=get_var_names, materialize_builtin = materialize_builtin)});\n')
gscanner.finalize()
self.datasource.groupinfo = None
class groupby(ast_node):
name = 'group by'
@property
def use_sp_gb (self):
return len(self.sp_refs) > 0
def produce(self, node):
if type(self.parent) is not projection:
raise ValueError('groupby can only be used in projection')
sp_refs = self.parent.sp_refs
node = enlist(node)
o_list = []
self.dedicated_glist = []
self.refs : Set[ColRef] = set()
self.sp_refs : Set[ColRef] = set()
for g in node:
self.datasource.rec = set()
g_expr = expr(self, g['value'])
refs : Set[ColRef] = self.datasource.rec
self.datasource.rec = None
this_sp_ref = refs.difference(sp_refs)
this_ref = refs.intersection(this_sp_ref)
# TODO: simplify this
self.refs = self.refs.union(this_ref)
self.sp_refs = self.sp_refs.union(this_sp_ref)
self.dedicated_glist.append((g_expr, refs))
g_str = g_expr.eval(c_code = False)
if 'sort' in g and f'{g["sort"]}'.lower() == 'desc':
g_str = g_str + ' ' + 'DESC'
o_list.append(g_str)
if not self.use_sp_gb:
self.dedicated_gb = None
self.add(', '.join(o_list))
def finalize(self, cexprs : List[Tuple[str, expr]], var_table : Dict[str, Union[str, int]]):
if self.use_sp_gb:
self.dedicated_gb = groupby_c(self.parent, self.dedicated_glist)
self.dedicated_gb.finalize(cexprs, var_table)
class join(ast_node):
name = 'join'
@ -257,7 +441,6 @@ class join(ast_node):
tbl = self.context.tables_byname[table_name]
if 'name' in node:
tbl.add_alias(node['name'])
self.append(tbl, alias)
else:
keys = node.keys()
@ -271,7 +454,10 @@ class join(ast_node):
self.tables_dir = {**self.tables_dir, **j.tables_dir}
elif type(node) is str:
if node in self.context.tables_byname:
self.append(self.context.tables_byname[node])
else:
print(f'Error: table {node} not found.')
def get_cols(self, colExpr: str) -> ColRef:
for t in self.tables:
@ -325,8 +511,6 @@ class create_table(ast_node):
if self.context.use_columnstore:
self.sql += ' engine=ColumnStore'
class insert(ast_node):
name = 'insert'
first_order = name
@ -405,55 +589,260 @@ class outfile(ast_node):
class udf(ast_node):
name = 'udf'
first_order = name
@dataclass
class builtin_var:
enabled : bool = False
_type : Types = AnyT
all = ('_builtin_len', '_builtin_ret')
def decltypecall(self, c_code = False, *args):
from engine.types import fn_behavior
class dummy:
def __init__(self, name):
self.cname = name + '_gettype'
self.sqlname = self.cname
return fn_behavior(dummy(self.cname), c_code, *args)
def __call__(self, c_code = False, *args):
from engine.types import fn_behavior
return fn_behavior(self, c_code, *args)
builtin_args = [f'{{{n}()}}' for n, v in self.builtin.items() if v.enabled]
return fn_behavior(self, c_code, *args, *builtin_args)
def return_type(self, *_ : Types):
return LazyT
def init(self, node):
def init(self, _):
self.builtin : Dict[str, udf.builtin_var] = {
'_builtin_len' : udf.builtin_var(False, UIntT),
'_builtin_ret' : udf.builtin_var(False, Types(
255, name = 'generic_ref', cname = 'auto&'
))
}
self.var_table = {}
self.args = []
if self.context.udf is None:
self.context.udf = Context.udf_head
self.context.headers.add('\"./udf.hpp\"')
self.vecs = set()
self.code_list = []
self.builtin_used = None
def add(self, *code):
ccode = ''
for c in code:
if type(c) is str:
ccode += c
else:
self.code_list.append(ccode)
self.code_list.append(c)
ccode = ''
if ccode:
self.code_list.append(ccode)
def produce(self, node):
from engine.utils import get_leagl_name, check_leagl_name
from engine.utils import get_legal_name, check_legal_name
node = node[self.name]
# register udf
self.cname = get_leagl_name(node['fname'])
self.agg = 'Agg' in node
self.cname = get_legal_name(node['fname'])
self.sqlname = self.cname
self.context.udf_map[self.cname] = self
self.ccode = f'auto {self.cname} = []('
if self.agg:
self.context.udf_agg_map[self.cname] = self
self.add(f'auto {self.cname} = [](')
def get_block(self, ind, node):
if 'stmt' in node:
old_ind = ind
ind += '\t'
next_stmt = enlist(node['stmt'])
if len(next_stmt) > 1:
self.add(f' {{\n')
self.get_stmt(ind ,next_stmt)
self.add(f'{old_ind}}}\n')
else:
self.get_stmt(ind, next_stmt)
def get_cname(self, x:str):
return self.var_table[x]
def get_assignment(self, ind, node, *, types = 'auto', sep = ';\n'):
var_ex = expr(self, node['var'], c_code=True, supress_undefined = True)
ex = expr(self, node['expr'], c_code=True)
var = var_ex.eval(y=self.get_cname)
if var in self.var_table or hasattr(var_ex, 'builtin_var'):
op = '='
if 'op' in node and node['op'] != ':=':
op = node['op']
e = ex.eval(y=self.get_cname)
def assign_behavior(decltypestr = False):
nonlocal ind, var, op, e, sep
v = var(decltypestr) if callable(var) else var
_e = e(decltypestr) if callable(e) else e
if v == '_builtin_ret':
return f'{ind}return {_e}{sep}'
elif '_builtin_ret' not in _e:
return f'{ind}{v} {op} {_e}{sep}'
else:
return ''
self.add(assign_behavior)
else:
cvar = get_legal_name(var)
self.var_table[var] = cvar
self.add(f'{ind}{types} {cvar} = ', ex.eval(y=self.get_cname), sep)
def get_stmt(self, ind, node):
node = enlist(node)
for n in node:
if 'if' in n:
_ifnode = n['if']
self.add(f'{ind}if(', expr(self, _ifnode["cond"]).eval(y=self.get_cname), ')')
if 'stmt' in _ifnode:
self.get_block(ind, _ifnode)
else:
self.add('\n')
self.get_stmt(ind + '\t', _ifnode)
if 'elif' in _ifnode:
for e in n['elif']:
self.add(f'{ind}else if(', expr(self, e["cond"]).eval(y=self.get_cname), ')')
self.get_block(ind, e)
if 'else' in _ifnode:
self.add(f'{ind}else ')
self.get_block(ind, _ifnode['else'])
elif 'for' in n:
_fornode = n['for']
defs = _fornode['defs']
self.add(f'{ind}for({"auto " if len(enlist(defs["op"])) != 0 else ";"}')
def get_inline_assignments(node, end = '; '):
var = enlist(node['var'])
op = enlist(node['op'])
expr = enlist(node['expr'])
len_node = len(enlist(op))
for i, (v, o, e) in enumerate(zip(var, op, expr)):
self.get_assignment('', {'var' : v, 'op' : o, 'expr' : e}, types = '', sep = ', ' if i != len_node - 1 else end)
get_inline_assignments(defs)
self.add(expr(self, _fornode["cond"]).eval(y=self.get_cname), '; ')
get_inline_assignments(_fornode['tail'], ') ')
if 'stmt' in _fornode:
self.get_block(ind, _fornode)
else:
self.add('\n')
self.get_stmt(ind + '\t', _fornode)
elif 'assignment' in n:
assign = n['assignment']
self.get_assignment(ind, assign)
def consume(self, node):
from engine.utils import get_leagl_name, check_leagl_name
from engine.utils import get_legal_name, check_legal_name
node = node[self.name]
if 'params' in node:
for args in node['params']:
cname = get_leagl_name(args)
cname = get_legal_name(args)
self.var_table[args] = cname
self.args.append(cname)
self.ccode += ', '.join([f'auto {a}' for a in self.args]) + ') {\n'
if 'assignment' in node:
for assign in node['assignment']:
var = assign['var']
ex = expr(self, assign['expr'], c_code=True)
if var in self.var_table:
self.ccode += f'\t{var} = {ex.eval()};\n'
else:
cvar = get_leagl_name(var)
self.var_table[var] = cvar
self.ccode += f'\tauto {cvar} = {ex.eval()};\n'
front = [*self.code_list, ', '.join([f'auto {a}' for a in self.args])]
self.code_list = []
self.with_storage = False
self.with_statics = False
self.static_decl : Optional[List[str]] = None
ind = '\t'
if 'static_decl' in node:
self.add(') {\n')
curr = node['static_decl']
self.with_statics = True
if 'var' in curr and 'expr' in curr:
if len(curr['var']) != len(curr['expr']):
print("Error: every static variable must be initialized.")
self.static_decl = []
for v, e in zip(curr['var'], curr['expr']):
cname = get_legal_name(v)
self.var_table[v] = cname
self.static_decl.append(f'{cname} = ', expr(self, e, c_code=True).eval(self.get_cname))
self.add(f'{ind}static auto {"; static auto ".join(self.static_decl)};\n')
self.add(f'{ind}auto reset = [=]() {{ {"; ".join(self.static_decl)}; }};\n')
self.add(f'{ind}auto call = []({", ".join([f"decltype({a}) {a}" for a in self.args])}')
ind = '\t\t'
front = [*front, *self.code_list]
self.code_list = []
if 'stmt' in node:
self.get_stmt(ind, node['stmt'])
# first scan to determine vec types
# if self.agg:
# for assign in node['assignment']:
# var = fastscan(assign['var'])
# ex = fastscan(assign['expr'])
# self.vecs.union(var.vec_vars)
# self.vecs.union(var.requested_lens)
# self.vecs.union(ex.vec_vars)
# self.vecs.union(ex.requested_lens)
# if len(self.vecs) != 0:
# self.idx_var = 'idx_' + base62uuid(5)
# self.ccode += f'{ind}auto {self.idx_var} = 0;\n'
ret = node['ret']
self.ccode += f'\treturn {expr(self, ret, c_code=True).eval()};'
self.ccode += '\n};\n'
print(self.ccode)
self.context.udf += self.ccode + '\n'
def return_call(decltypestr = False):
if (decltypestr):
return ''
ret = ''
for r in self.return_call:
if callable(r):
ret += r(False)
else:
ret += r
return ret
self.return_call = (f'{ind}return ', expr(self, ret, c_code=True).eval(self.get_cname), ';\n')
self.add(return_call)
if self.with_statics:
self.add('\t};\n')
self.add('\treturn std::make_pair(reset, call);\n')
self.add('};\n')
#print(self.ccode)
self.builtin_args = [(name, var._type.cname) for name, var in self.builtin.items() if var.enabled]
# self.context.udf += front + builtin_argstr + self.ccode + '\n'
self.finalize(front)
def finalize(self, front):
builtin_argstr = ', ' if len(self.builtin_args) and len(self.args) else ''
builtin_argstr += ', '.join([f'{t} {n}' for (n, t) in self.builtin_args])
self.builtin_used = [b for b, v in self.builtin.items() if v.enabled]
ccode = ''
def process_recursion(l, decltypestr = False):
nonlocal ccode
for c in l:
if type(c) is str:
ccode += c
elif callable(c):
ccode += c(decltypestr) # a callback function
else:
raise ValueError(f'Illegal operation in udf code generation: {c}')
process_recursion(front)
ccode += builtin_argstr + ') {\n'
process_recursion(self.code_list)
self.context.udf += ccode + '\n'
ccode = ''
if self.return_pattern == udf.ReturnPattern.elemental_return:
ccode += f'auto {self.cname}_gettype = []('
process_recursion(front[1:], True)
ccode += ') {\n\tuint32_t _builtin_len = 0;\n'
process_recursion(self.code_list, True)
self.context.udf += ccode + '\n'
class ReturnPattern(Enum):
bulk_return = auto()
elemental_return = auto()
@property
def return_pattern(self):
if '_builtin_ret' in self.builtin_used:
return udf.ReturnPattern.elemental_return
else:
return udf.ReturnPattern.bulk_return
def include(objs):
import inspect

@ -1,26 +1,59 @@
from typing import Optional
from reconstruct.ast import ast_node
from reconstruct.storage import ColRef
from reconstruct.storage import ColRef, Context
from engine.types import *
# TODO: Decouple expr and upgrade architecture
# C_CODE : get ccode/sql code?
# projections : C/SQL/decltype string
# orderby/joins/where : SQL only
# assumption/groupby : C/sql
# is_udfexpr: C only
class expr(ast_node):
name='expr'
@property
def udf_decltypecall(self):
return self._udf_decltypecall if self._udf_decltypecall else self.sql
def __init__(self, parent, node, *, c_code = None):
from reconstruct.ast import projection
@udf_decltypecall.setter
def udf_decltypecall(self, val):
self._udf_decltypecall = val
@property
def need_decltypestr(self):
return self._udf_decltypecall is not None
def __init__(self, parent, node, *, c_code = None, supress_undefined = False):
from reconstruct.ast import projection, udf
self.type = None
self.raw_col = None
self.udf : Optional[udf] = None
self.inside_agg = False
self.is_special = False
self.is_ColExpr = False
self.is_recursive_call_inudf = False
self.codlets : list = []
self.codebuf : Optional[str] = None
self._udf_decltypecall = None
self.supress_undefined = supress_undefined
if(type(parent) is expr):
self.inside_agg = parent.inside_agg
if(type(parent) is not expr):
self.root = self
self.c_code = type(parent) is projection
else:
self.root = parent.root
self.is_udfexpr = parent.is_udfexpr
self.root : expr = parent.root
self.c_code = parent.c_code
self.builtin_vars = parent.builtin_vars
else:
self.is_udfexpr = type(parent) is udf
self.root : expr = self
self.c_code = self.is_udfexpr or type(parent) is projection
if self.is_udfexpr:
self.udf : udf = parent
self.builtin_vars = self.udf.builtin.keys()
else:
self.builtin_vars = []
if type(c_code) is bool:
self.c_code = c_code
@ -38,8 +71,11 @@ class expr(ast_node):
self.udf_map = parent.context.udf_map
self.func_maps = {**builtin_func, **self.udf_map}
self.operators = {**builtin_operators, **self.udf_map}
def produce(self, node):
from engine.utils import enlist
from reconstruct.ast import udf
if type(node) is dict:
for key, val in node.items():
if key in self.operators:
@ -49,20 +85,98 @@ class expr(ast_node):
exp_vals = [expr(self, v, c_code = self.c_code) for v in val]
str_vals = [e.sql for e in exp_vals]
type_vals = [e.type for e in exp_vals]
try:
self.type = op.return_type(*type_vals)
except AttributeError as e:
if type(self.root) is not udf:
print(f'alert: {e}')
self.type = AnyT
self.sql = op(self.c_code, *str_vals)
special_func = [*self.context.udf_map.keys(), "maxs", "mins", "avgs", "sums"]
if key in special_func and not self.is_special:
self.is_special = True
if key in self.context.udf_map:
self.root.udf = self.context.udf_map[key]
if key == self.root.udf.name:
self.root.is_recursive_call_inudf = True
p = self.parent
while type(p) is expr and not p.is_special:
p.is_special = True
p = p.parent
need_decltypestr = any([e.need_decltypestr for e in exp_vals])
if need_decltypestr or (self.udf and type(op) is udf):
decltypestr_vals = [e.udf_decltypecall for e in exp_vals]
self.udf_decltypecall = op(self.c_code, *decltypestr_vals)
if self.udf and type(op) is udf:
self.udf_decltypecall = op.decltypecall(self.c_code, *decltypestr_vals)
elif self.is_udfexpr:
var_table = self.root.udf.var_table
vec = key.split('.')
_vars = [*var_table, *self.builtin_vars]
def get_vname (node):
if node in self.builtin_vars:
self.root.udf.builtin[node].enabled = True
self.builtin_var = node
return node
else:
return var_table[node]
if vec[0] not in _vars:
print(f'Use of undefined variable {vec[0]}')
else:
vname = get_vname(vec[0])
val = enlist(val)
if(len(val) > 2):
print('Warning: more than 2 indexes found for subvec operator.')
ex = [expr(self, v, c_code = self.c_code) for v in val]
idxs = ', '.join([e.sql for e in ex])
self.sql = f'{vname}.subvec({idxs})'
if any([e.need_decltypestr for e in ex]):
self.udf_decltypecall = f'{vname}.subvec({[", ".join([e.udf_decltypecall for e in ex])]})'
if key == 'get' and len(val) > 1:
ex_vname = expr(self, val[0], c_code=self.c_code)
self.sql = f'{ex_vname.sql}[{expr(self, val[1], c_code=self.c_code).sql}]'
if hasattr(ex_vname, 'builtin_var'):
if not hasattr(self, 'builtin_var'):
self.builtin_var = []
self.builtin_var = [*self.builtin_var, *ex_vname.builtin_var]
self.udf_decltypecall = ex_vname.sql
else:
print(f'Undefined expr: {key}{val}')
elif type(node) is str:
if self.is_udfexpr:
curr_udf : udf = self.root.udf
var_table = curr_udf.var_table
split = node.split('.')
if split[0] in var_table:
varname = var_table[split[0]]
if curr_udf.agg and varname in curr_udf.vecs:
if len(split) > 1:
if split[1] == 'vec':
self.sql += varname
elif split[1] == 'len':
self.sql += f'{varname}.size'
else:
print(f'no member {split[1]} in object {varname}')
else:
self.sql += f'{varname}[{curr_udf.idx_var}]'
else:
self.sql += varname
elif self.supress_undefined or split[0] in self.builtin_vars:
self.sql += node
if split[0] in self.builtin_vars:
curr_udf.builtin[split[0]].enabled = True
self.builtin_var = split[0]
else:
print(f'Undefined varname: {split[0]}')
# get the column from the datasource in SQL context
else:
p = self.parent
while type(p) is expr and not p.isvector:
p.isvector = True
@ -71,6 +185,7 @@ class expr(ast_node):
self.raw_col = self.datasource.parse_col_names(node)
self.raw_col = self.raw_col if type(self.raw_col) is ColRef else None
if self.raw_col is not None:
self.is_ColExpr = True
self.sql = self.raw_col.name
self.type = self.raw_col.type
else:
@ -91,18 +206,109 @@ class expr(ast_node):
elif type(node) is float:
self.type = DoubleT
def finalize(self, y):
if self.c_code:
x = self.is_special
self.sql = eval('f\'' + self.sql + '\'')
def finalize(self, override = False):
from reconstruct.ast import udf
if self.codebuf is None or override:
self.codebuf = ''
for c in self.codlets:
if type(c) is str:
self.codebuf += c
elif type(c) is udf:
self.codebuf += c()
elif type(c) is expr:
self.codebuf += c.finalize(override=override)
return self.codebuf
def __str__(self):
return self.sql
def __repr__(self):
return self.__str__()
def eval(self):
x = self.c_code
y = lambda x: x
# builtins is readonly, so it's okay to set default value as an object
# eval is only called at root expr.
def eval(self, c_code = None, y = lambda t: t, materialize_builtin = False, _decltypestr = False, *, gettype = False):
assert(self.is_root)
def call(decltypestr = False) -> str:
nonlocal c_code, y, materialize_builtin
if self.udf is not None:
loc = locals()
builtin_vars = self.udf.builtin_used
for b in self.udf.builtin_var.all:
exec(f'loc["{b}"] = lambda: "{{{b}()}}"')
if builtin_vars:
if type(materialize_builtin) is dict:
for b in builtin_vars:
exec(f'loc["{b}"] = lambda: "{materialize_builtin[b]}"')
elif self.is_recursive_call_inudf:
for b in builtin_vars:
exec(f'loc["{b}"] = lambda : "{b}"')
x = self.c_code if c_code is None else c_code
if decltypestr:
return eval('f\'' + self.udf_decltypecall + '\'')
return eval('f\'' + self.sql + '\'')
if self.is_recursive_call_inudf or (self.need_decltypestr and self.is_udfexpr) or gettype:
return call
else:
return call(_decltypestr)
@property
def is_root(self):
return self.root == self
# For UDFs: first check if agg variable is used as vector
# if not, then check if its length is used
class fastscan(expr):
name = 'fastscan'
def init(self, _):
self.vec_vars = set()
self.requested_lens = set()
super().init(self, _)
def process(self, key : str):
segs = key.split('.')
var_table = self.root.udf.var_table
if segs[0] in var_table and len(segs) > 1:
if segs[1] == 'vec':
self.vec_vars.add(segs[0])
elif segs[1] == 'len':
self.requested_lens.add(segs[0])
def produce(self, node):
from engine.utils import enlist
if type(node) is dict:
for key, val in node.items():
if key in self.operators:
val = enlist(val)
elif self.is_udfexpr:
self.process(key)
[fastscan(self, v, c_code = self.c_code) for v in val]
elif type(node) is str:
self.process(node)
class getrefs(expr):
name = 'getrefs'
def init(self, _):
self.datasource.rec = set()
self.rec = None
def produce(self, node):
from engine.utils import enlist
if type(node) is dict:
for key, val in node.items():
if key in self.operators:
val = enlist(val)
[getrefs(self, v, c_code = self.c_code) for v in val]
elif type(node) is str:
self.datasource.parse_col_names(node)
import engine.expr as cexpr
def consume(self, _):
if self.root == self:
self.rec = self.datasource.rec
self.datasource.rec = None

@ -86,6 +86,7 @@ class Context:
self.sql = ''
self.finalized = False
self.udf = None
self.scans = []
def __init__(self):
self.tables_byname = dict()
@ -94,6 +95,7 @@ class Context:
self.cols = []
self.datasource = None
self.udf_map = dict()
self.udf_agg_map = dict()
self.use_columnstore = False
self.print = print
self.has_dll = False
@ -110,6 +112,9 @@ class Context:
tbl = TableInfo(table_name, cols, self)
self.tables.append(tbl)
return tbl
def remove_scan(self, scan, str_scan):
self.emitc(str_scan)
self.scans.remove(scan)
function_head = '''
extern "C" int __DLLEXPORT__ dllmain(Context* cxt) {

@ -3,3 +3,4 @@ mo-dots==8.20.21357
mo-parsing
mo-imports
readline; sys_platform != 'win32'
numpy

@ -0,0 +1,12 @@
#include "../server/libaquery.h"
extern void* Aalloc(size_t sz);
extern int Afree(void * mem);
template <typename T>
size_t register_memory(T* ptr){
[](void* m){ auto _m = static_cast<T*>(m); delete _m; };
}
#define EXPORT __DLLEXPORT__

@ -0,0 +1,5 @@
#include "aquery.h"
#include <memory>
#include <stdlib>

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

@ -1,2 +0,0 @@
#TargetFrameworkVersion=:PlatformToolSet=v141:EnableManagedIncrementalBuild=false:VCToolArchitecture=Native32Bit:WindowsTargetPlatformVersion=10.0.19041.0
Release|Win32|d:\gg\AQuery++\server\|

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

@ -14,11 +14,26 @@ const char* monetdbe_type_str[] = {
// should be last:
"monetdbe_type_unknown"
} ;
const unsigned char monetdbe_type_szs[] = {
sizeof(monetdbe_column_bool::null_value), sizeof(monetdbe_column_int8_t::null_value),
sizeof(monetdbe_column_int16_t::null_value), sizeof(monetdbe_column_int32_t::null_value),
sizeof(monetdbe_column_int64_t::null_value),
#ifdef HAVE_HGE
sizeof(monetdbe_column_int128_t::null_value),
#endif
sizeof(monetdbe_column_size_t::null_value), sizeof(monetdbe_column_float::null_value),
sizeof(monetdbe_column_double::null_value),
sizeof(monetdbe_column_str::null_value), sizeof(monetdbe_column_blob::null_value),
sizeof(monetdbe_data_date), sizeof(monetdbe_data_time), sizeof(monetdbe_data_timestamp),
// should be last:
1
};
Server::Server(Context* cxt){
if (cxt){
connect(cxt);
}
}
void Server::connect(Context *cxt){

@ -1,4 +1,8 @@
#ifdef _WIN32
#include "monetdbe.h"
#else
#include "monetdb/monetdbe.h"
#endif
struct Context;

@ -111,49 +111,50 @@ int dll_main(int argc, char** argv, Context* cxt){
return 0;
}
extern "C" int __DLLEXPORT__ main(int argc, char** argv) {
puts("running");
Context* cxt = new Context();
cxt->log("%d %s\n", argc, argv[1]);
const char* shmname;
if (argc < 0)
return dll_main(argc, argv, cxt);
else if (argc <= 1)
return test_main();
else
shmname = argv[1];
SharedMemory shm = SharedMemory(shmname);
if (!shm.pData)
return 1;
bool &running = static_cast<bool*>(shm.pData)[0],
&ready = static_cast<bool*>(shm.pData)[1];
using namespace std::chrono_literals;
cxt->log("running: %s\n", running? "true":"false");
cxt->log("ready: %s\n", ready? "true":"false");
while (running) {
std::this_thread::sleep_for(1ms);
if(ready){
cxt->log("running: %s\n", running? "true":"false");
cxt->log("ready: %s\n", ready? "true":"false");
void* handle = dlopen("./dll.so", RTLD_LAZY);
cxt->log("handle: %lx\n", handle);
if (handle) {
cxt->log("inner\n");
code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, "dllmain"));
cxt->log("routine: %lx\n", c);
if (c) {
cxt->log("inner\n");
cxt->err("return: %d\n", c(cxt));
}
}
ready = false;
}
}
shm.FreeMemoryMap();
return 0;
}
//
//extern "C" int __DLLEXPORT__ main(int argc, char** argv) {
//
// puts("running");
// Context* cxt = new Context();
// cxt->log("%d %s\n", argc, argv[1]);
//
// const char* shmname;
// if (argc < 0)
// return dll_main(argc, argv, cxt);
// else if (argc <= 1)
// return test_main();
// else
// shmname = argv[1];
// SharedMemory shm = SharedMemory(shmname);
// if (!shm.pData)
// return 1;
// bool &running = static_cast<bool*>(shm.pData)[0],
// &ready = static_cast<bool*>(shm.pData)[1];
// using namespace std::chrono_literals;
// cxt->log("running: %s\n", running? "true":"false");
// cxt->log("ready: %s\n", ready? "true":"false");
// while (running) {
// std::this_thread::sleep_for(1ms);
// if(ready){
// cxt->log("running: %s\n", running? "true":"false");
// cxt->log("ready: %s\n", ready? "true":"false");
// void* handle = dlopen("./dll.so", RTLD_LAZY);
// cxt->log("handle: %lx\n", handle);
// if (handle) {
// cxt->log("inner\n");
// code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, "dllmain"));
// cxt->log("routine: %lx\n", c);
// if (c) {
// cxt->log("inner\n");
// cxt->err("return: %d\n", c(cxt));
// }
// }
// ready = false;
// }
// }
// shm.FreeMemoryMap();
// return 0;
//}
#include "utils.h"
int test_main()
{

@ -62,6 +62,9 @@ public:
ColRef(const char* name, types::Type_t ty) : name(name), ty(ty) {}
using vector_type<_Ty>::operator[];
using vector_type<_Ty>::operator=;
using vector_type<_Ty>::subvec;
using vector_type<_Ty>::subvec_view;
using vector_type<_Ty>::subvec_deep;
ColView<_Ty> operator [](const vector_type<uint32_t>&idxs) const {
return ColView<_Ty>(*this, idxs);
}
@ -132,6 +135,14 @@ public:
ret[i] = orig[idxs[i]];
return ret;
}
ColRef<_Ty> subvec_deep(uint32_t start, uint32_t end) {
uint32_t len = end - start;
ColRef<_Ty> subvec(len);
for (uint32_t i = 0; i < len; ++i)
subvec[i] = operator[](i);
return subvec;
}
inline ColRef<_Ty> subvec_deep(uint32_t start = 0) { return subvec_deep(start, size); }
};
template <template <class...> class VT, class T>
std::ostream& operator<<(std::ostream& os, const VT<T>& v)
@ -431,84 +442,84 @@ inline void TableInfo<Types...>::print(const char* __restrict sep, const char* _
}
template <class T1, class T2, template<typename ...> class VT, template<typename ...> class VT2>
decayed_t<VT, typename types::Coercion<T1, T2>::type> operator -(const VT<T1>& lhs, const VT2<T2>& rhs) {
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(lhs.size, "");
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(lhs.size);
for (int i = 0; i < lhs.size; ++i)
ret[i] = lhs[i] - rhs[i];
return ret;
}
template <class T1, class T2, template<typename ...> class VT>
decayed_t<VT, typename types::Coercion<T1, T2>::type> operator -(const VT<T1>& lhs, const T2& rhs) {
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(lhs.size, "");
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(lhs.size);
for (int i = 0; i < lhs.size; ++i)
ret[i] = lhs[i] - rhs;
return ret;
}
template <class T1, class T2, template<typename ...> class VT>
decayed_t<VT, typename types::Coercion<T1, T2>::type> operator -(const T2& lhs, const VT<T1>& rhs) {
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(rhs.size, "");
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(rhs.size);
for (int i = 0; i < rhs.size; ++i)
ret[i] = lhs - rhs[i];
return ret;
}
template <class T1, class T2, template<typename ...> class VT, template<typename ...> class VT2>
decayed_t<VT, typename types::Coercion<T1, T2>::type> operator +(const VT<T1>& lhs, const VT2<T2>& rhs) {
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(lhs.size, "");
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(lhs.size);
for (int i = 0; i < lhs.size; ++i)
ret[i] = lhs[i] + rhs[i];
return ret;
}
template <class T1, class T2, template<typename ...> class VT>
decayed_t<VT, typename types::Coercion<T1, T2>::type> operator +(const VT<T1>& lhs, const T2& rhs) {
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(lhs.size, "");
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(lhs.size);
for (int i = 0; i < lhs.size; ++i)
ret[i] = lhs[i] + rhs;
return ret;
}
template <class T1, class T2, template<typename ...> class VT>
decayed_t<VT, typename types::Coercion<T1, T2>::type> operator +(const T2& lhs, const VT<T1>& rhs) {
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(rhs.size, "");
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(rhs.size);
for (int i = 0; i < rhs.size; ++i)
ret[i] = lhs + rhs[i];
return ret;
}
template <class T1, class T2, template<typename ...> class VT, template<typename ...> class VT2>
decayed_t<VT, typename types::Coercion<T1, T2>::type> operator *(const VT<T1>& lhs, const VT2<T2>& rhs) {
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(lhs.size, "");
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(lhs.size);
for (int i = 0; i < lhs.size; ++i)
ret[i] = lhs[i] * rhs[i];
return ret;
}
template <class T1, class T2, template<typename ...> class VT>
decayed_t<VT, typename types::Coercion<T1, T2>::type> operator *(const VT<T1>& lhs, const T2& rhs) {
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(lhs.size, "");
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(lhs.size);
for (int i = 0; i < lhs.size; ++i)
ret[i] = lhs[i] * rhs;
return ret;
}
template <class T1, class T2, template<typename ...> class VT>
decayed_t<VT, typename types::Coercion<T1, T2>::type> operator *(const T2& lhs, const VT<T1>& rhs) {
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(rhs.size, "");
auto ret = decayed_t<VT, typename types::Coercion<T1, T2>::type>(rhs.size);
for (int i = 0; i < rhs.size; ++i)
ret[i] = lhs * rhs[i];
return ret;
}
template <class T1, class T2, template<typename ...> class VT, template<typename ...> class VT2>
decayed_t<VT, types::GetFPType<typename types::Coercion<T1, T2>::type>> operator /(const VT<T1>& lhs, const VT2<T2>& rhs) {
auto ret = decayed_t<VT, types::GetFPType<typename types::Coercion<T1, T2>::type>>(lhs.size, "");
auto ret = decayed_t<VT, types::GetFPType<typename types::Coercion<T1, T2>::type>>(lhs.size);
for (int i = 0; i < lhs.size; ++i)
ret[i] = lhs[i] / rhs[i];
return ret;
}
template <class T1, class T2, template<typename ...> class VT>
decayed_t<VT, types::GetFPType<typename types::Coercion<T1, T2>::type>> operator /(const VT<T1>& lhs, const T2& rhs) {
auto ret = decayed_t<VT, types::GetFPType<typename types::Coercion<T1, T2>::type>>(lhs.size, "");
auto ret = decayed_t<VT, types::GetFPType<typename types::Coercion<T1, T2>::type>>(lhs.size);
for (int i = 0; i < lhs.size; ++i)
ret[i] = lhs[i] / rhs;
return ret;
}
template <class T1, class T2, template<typename ...> class VT>
decayed_t<VT, types::GetFPType<typename types::Coercion<T1, T2>::type>> operator /(const T2& lhs, const VT<T1>& rhs) {
auto ret = decayed_t<VT, types::GetFPType<typename types::Coercion<T1, T2>::type>>(rhs.size, "");
auto ret = decayed_t<VT, types::GetFPType<typename types::Coercion<T1, T2>::type>>(rhs.size);
for (int i = 0; i < rhs.size; ++i)
ret[i] = lhs / rhs[i];
return ret;

@ -47,6 +47,7 @@ public:
_Ty* container;
uint32_t size, capacity;
typedef _Ty* iterator_t;
typedef _Ty value_t;
vector_type(const uint32_t& size) : size(size), capacity(size) {
container = (_Ty*)malloc(size * sizeof(_Ty));
}
@ -58,6 +59,7 @@ public:
}
}
constexpr vector_type() noexcept : size(0), capacity(0), container(0) {};
constexpr vector_type(_Ty* container, uint32_t len) noexcept : size(len), capacity(0), container(container) {};
constexpr vector_type(const vector_type<_Ty>& vt) noexcept : capacity(0) {
_copy(vt);
}
@ -107,11 +109,13 @@ public:
}
void emplace_back(_Ty _val) {
if (size >= capacity) { // geometric growth
capacity += 1 + (capacity >> 1);
_Ty* n_container = (_Ty*)malloc(capacity * sizeof(_Ty));
uint32_t new_capacity = size + 1 + (size >> 1);
_Ty* n_container = (_Ty*)malloc(new_capacity * sizeof(_Ty));
memcpy(n_container, container, sizeof(_Ty) * size);
if (capacity)
free(container);
container = n_container;
capacity = new_capacity;
}
container[size++] = _val;
}
@ -208,6 +212,25 @@ public:
size = this->size + dist;
}
void out(uint32_t n = 4, const char* sep = " ") const;
vector_type<_Ty> subvec(uint32_t start, uint32_t end) {
vector_type<_Ty> subvec(end - start);
memcpy(subvec.container, container + start, sizeof(_Ty) * (end - start));
return subvec;
}
inline vector_type<_Ty> subvec_view(uint32_t start, uint32_t end) {
return subvec(container + start, end - start);
}
vector_type<_Ty> subvec_deep(uint32_t start, uint32_t end) {
uint32_t len = end - start;
vector_type<_Ty> subvec(len);
for (uint32_t i = 0; i < len; ++i)
subvec[i] = container[i];
return subvec;
}
inline vector_type<_Ty> subvec(uint32_t start = 0) { return subvec(start, size); }
inline vector_type<_Ty> subvec_view(uint32_t start = 0) { return subvec_view(start, size); }
inline vector_type<_Ty> subvec_deep(uint32_t start = 0) { return subvec_deep(start, size); }
~vector_type() {
if (capacity > 0) free(container);
container = 0; size = capacity = 0;
@ -285,6 +308,12 @@ public:
void operator[](const uint32_t& i) {
}
vector_type<void> subvec(uint32_t, uint32_t);
vector_type<void> subvec_view(uint32_t, uint32_t);
vector_type<void> subvec_deep(uint32_t, uint32_t);
vector_type<void> subvec(uint32_t);
vector_type<void> subvec_view(uint32_t);
vector_type<void> subvec_deep(uint32_t);
};
#pragma pack(pop)
#endif

@ -2,17 +2,55 @@
#include "./server/libaquery.h"
#include "./server/aggregations.h"
auto covariance = [](auto x, auto y) {
auto xmean = avg(x);
auto ymean = avg(y);
return avg(((x - xmean) * (y - ymean)));
auto covariances2 = [](auto x, auto y, auto w, uint32_t _builtin_len, auto& _builtin_ret) {
auto xmeans = 0.0;
auto ymeans = 0.0;
auto l = _builtin_len;
if((l > 0)) {
xmeans = x[0];
ymeans = y[0];
_builtin_ret[0] = 0.0;
}
if((w > l))
w = l;
for(auto i = 1, j = 0; (i < w); i = (i + 1)) {
xmeans += x[i];
ymeans += y[i];
_builtin_ret[i] = avg(((x.subvec(0, i) - (xmeans / i)) * (y.subvec(0, i) - (ymeans / i))));
}
xmeans /= w;
ymeans /= w;
for(auto i = w; (i < l); i += 1) {
xmeans += ((x[i] - x[(i - w)]) / w);
ymeans += ((y[i] - y[(i - w)]) / w);
_builtin_ret[i] = avg(((x.subvec((i - w), i) - xmeans) * (y.subvec((i - w), i) - ymeans)));
}
return ;
};
auto sd = [](auto x) {
return sqrt(covariance(x, x));
};
auto paircorr = [](auto x, auto y) {
return (covariance(x, y) / (sd(x) * sd(y)));
auto covariances2_gettype = [](auto x, auto y, auto w) {
uint32_t _builtin_len = 0;
auto xmeans = 0.0;
auto ymeans = 0.0;
auto l = _builtin_len;
if((l > 0)) {
xmeans = x[0];
ymeans = y[0];
return 0.0;
}
if((w > l))
w = l;
for(auto i = 1, j = 0; (i < w); i = (i + 1)) {
xmeans += x[i];
ymeans += y[i];
return avg(((x.subvec(0, i) - (xmeans / i)) * (y.subvec(0, i) - (ymeans / i))));
}
xmeans /= w;
ymeans /= w;
for(auto i = w; (i < l); i += 1) {
xmeans += ((x[i] - x[(i - w)]) / w);
ymeans += ((y[i] - y[(i - w)]) / w);
return avg(((x.subvec((i - w), i) - xmeans) * (y.subvec((i - w), i) - ymeans)));
}
};

Binary file not shown.

@ -0,0 +1,14 @@
AGGREGATION FUNCTION covariances(x, y, w){
static xmeans := 0., ymeans := 0., cnt := 0;
if (cnt < w)
{
xmeans += x;
ymeans += y;
cnt+=1;
}
else {
xmeans += (x - x.vec[cnt - w]) / w;
ymeans += (y - y.vec[cnt - w]) / w;
}
avg (( x.vec(x.len-w, x.len) - xmeans ) * (y.vec(y.len - w, y.len) - ymeans ))
}

@ -0,0 +1,41 @@
AGGREGATION FUNCTION add(a, b){
a+b
}
AGGREGATION FUNCTION covariances2(x, y, w){
xmeans := 0.;
ymeans := 0.;
l := _builtin_len;
if (l > 0)
{
xmeans := x[0];
ymeans := y[0];
_builtin_ret[0] := 0.;
}
if (w > l)
w := l;
for (i := 1, j:= 0; i < w; i := i+1) {
xmeans += x[i];
ymeans += y[i];
_builtin_ret[i] := avg (( x(0, i) - xmeans/i ) * (y(0, i) - ymeans/i ));
}
xmeans /= w;
ymeans /= w;
for (i := w; i < l; i += 1)
{
xmeans += (x[i] - x[i - w]) / w;
ymeans += (y[i] - y[i - w]) / w;
_builtin_ret[i] := avg (( x(i-w, i) - xmeans ) * (y(i - w, i) - ymeans ));
}
Null
}
CREATE TABLE test(a INT, b INT, c INT, d INT)
LOAD DATA INFILE "test.csv"
INTO TABLE test
FIELDS TERMINATED BY ","
select covariances2(a, b, 4) from test group by c;

@ -0,0 +1,32 @@
AGGREGATION FUNCTION covariances2(x, y, w){
xmeans := 0.;
ymeans := 0.;
l := _builtin_len;
if (w > l)
w := l;
elif (w > l + 2)
{
l := 3;
w := 4;
}
elif(w < 99){
l := 8;
}
elif(w<999)
w := 6;
else
l := l / 2;
for (i := 0, j:= 0; i < w; i := i+1) {
xmeans += x[i];
ymeans += y[i];
_builtin_ret[i] := avg (( x(l-w, l) - xmeans ) * (y(l - w, l) - ymeans ));
}
for (i := 0; i < l; i += 1)
{
xmeans += (x[i] - x[i - w]) / w;
ymeans += (y[i] - y[i - w]) / w;
_builtin_ret[i] := avg (( x(l-w, l) - xmeans ) * (y(l - w, l) - ymeans ));
}
Null
}

@ -0,0 +1,19 @@
#pragma once
#include "./server/libaquery.h"
#include "./server/aggregations.h"
auto covariances = [](auto x, auto y, auto w) {
static auto xmeans=0.0;static auto ymeans=0.0; static auto cnt=0;
auto reset = [=]() { xmeans=0.0, ymeans=0.0, cnt=0; };
auto call = [](decltype(x) x, decltype(y) y, decltype(w) w){
if((cnt < w)) {
xmeans += x;
ymeans += y;
cnt += 1;
}
y = (x - xmeans);
return avg(((x.subvec((x - w), x) - xmeans) * (y.subvec((y - w), y) - ymeans)));
};
return std::make_pair(reset, call);
};

@ -0,0 +1,14 @@
#include "udf.hpp"
int main(){
vector_type _a{1,2,3,4};
vector_type _b{2,3,3,5};
ColRef<int> a("a");
ColRef<int> b("b");
a.initfrom(_a, "a");
b.initfrom(_b, "b");
ColRef<decltype(covariances2_gettype(a,b,0))> ret{4};
covariances2(a,b,2,4,ret);
print(ret);
}

@ -0,0 +1,5 @@
#include "udf.hpp"
int main(){
}
Loading…
Cancel
Save