Bill 2 years ago
commit 541c702d78

6
.gitignore vendored

@ -59,6 +59,10 @@ data/benchmark
!nyctx100.csv !nyctx100.csv
!network.csv !network.csv
!test_complex.csv !test_complex.csv
data/electricity*
data/covtype*
data/phishing*
data/power*
*.out *.out
*.asm *.asm
!mmw.so !mmw.so
@ -83,3 +87,5 @@ udf*.hpp
*.ipynb *.ipynb
saved_procedures/** saved_procedures/**
procedures/** procedures/**
.mypy_cache
__pycache__

3
.gitmodules vendored

@ -0,0 +1,3 @@
[submodule "paper"]
path = paper
url = https://github.com/sunyinqi0508/AQueryPaper

@ -10,7 +10,7 @@ RUN export OS_VER=`cat /etc/os-release | grep VERSION_CODENAME` &&\
RUN wget --output-document=/etc/apt/trusted.gpg.d/monetdb.gpg https://dev.monetdb.org/downloads/MonetDB-GPG-KEY.gpg RUN wget --output-document=/etc/apt/trusted.gpg.d/monetdb.gpg https://dev.monetdb.org/downloads/MonetDB-GPG-KEY.gpg
RUN apt update && apt install -y python3 python3-pip clang-14 libmonetdbe-dev git RUN apt update && apt install -y python3 python3-pip clang-14 libmonetdbe-dev libmonetdb-client-dev monetdb5-sql-dev git
RUN git clone https://github.com/sunyinqi0508/AQuery2 RUN git clone https://github.com/sunyinqi0508/AQuery2

@ -2,6 +2,7 @@ OS_SUPPORT =
MonetDB_LIB = MonetDB_LIB =
MonetDB_INC = MonetDB_INC =
Defines = Defines =
CC = $(CXX) -xc
CXXFLAGS = --std=c++2a CXXFLAGS = --std=c++2a
ifeq ($(AQ_DEBUG), 1) ifeq ($(AQ_DEBUG), 1)
OPTFLAGS = -g3 #-fsanitize=address OPTFLAGS = -g3 #-fsanitize=address
@ -17,7 +18,7 @@ COMPILER = $(strip $(_COMPILER))
LIBTOOL = ar rcs LIBTOOL = ar rcs
USELIB_FLAG = -Wl,--whole-archive,libaquery.a -Wl,-no-whole-archive USELIB_FLAG = -Wl,--whole-archive,libaquery.a -Wl,-no-whole-archive
LIBAQ_SRC = server/monetdb_conn.cpp server/libaquery.cpp LIBAQ_SRC = server/monetdb_conn.cpp server/libaquery.cpp
LIBAQ_OBJ = monetdb_conn.o libaquery.o LIBAQ_OBJ = monetdb_conn.o libaquery.o monetdb_ext.o
SEMANTIC_INTERPOSITION = -fno-semantic-interposition SEMANTIC_INTERPOSITION = -fno-semantic-interposition
RANLIB = ranlib RANLIB = ranlib
_LINKER_BINARY = $(shell `$(CXX) -print-prog-name=ld` -v 2>&1 | grep -q LLVM && echo lld || echo ld) _LINKER_BINARY = $(shell `$(CXX) -print-prog-name=ld` -v 2>&1 | grep -q LLVM && echo lld || echo ld)
@ -43,7 +44,7 @@ else
LIBTOOL = gcc-ar rcs LIBTOOL = gcc-ar rcs
endif endif
endif endif
OPTFLAGS += $(SEMANTIC_INTERPOSITION) LINKFLAGS += $(SEMANTIC_INTERPOSITION)
ifeq ($(PCH), 1) ifeq ($(PCH), 1)
PCHFLAGS = -include server/pch.hpp PCHFLAGS = -include server/pch.hpp
@ -82,7 +83,7 @@ else
MonetDB_INC += $(AQ_MONETDB_INC) MonetDB_INC += $(AQ_MONETDB_INC)
MonetDB_INC += -I/usr/local/include/monetdb -I/usr/include/monetdb MonetDB_INC += -I/usr/local/include/monetdb -I/usr/include/monetdb
endif endif
MonetDB_LIB += -lmonetdbe MonetDB_LIB += -lmonetdbe -lmonetdbsql -lbat
endif endif
ifeq ($(THREADING),1) ifeq ($(THREADING),1)
@ -128,6 +129,7 @@ pch:
$(CXX) -x c++-header server/pch.hpp $(FPIC) $(CXXFLAGS) $(CXX) -x c++-header server/pch.hpp $(FPIC) $(CXXFLAGS)
libaquery: libaquery:
$(CXX) -c $(FPIC) $(PCHFLAGS) $(LIBAQ_SRC) $(OS_SUPPORT) $(CXXFLAGS) &&\ $(CXX) -c $(FPIC) $(PCHFLAGS) $(LIBAQ_SRC) $(OS_SUPPORT) $(CXXFLAGS) &&\
$(CC) -c server/monetdb_ext.c $(OPTFLAGS) $(MonetDB_INC) &&\
$(LIBTOOL) libaquery.a $(LIBAQ_OBJ) &&\ $(LIBTOOL) libaquery.a $(LIBAQ_OBJ) &&\
$(RANLIB) libaquery.a $(RANLIB) libaquery.a

@ -2,7 +2,7 @@
## GLOBAL CONFIGURATION FLAGS ## GLOBAL CONFIGURATION FLAGS
version_string = '0.6.0a' version_string = '0.7.0a'
add_path_to_ldpath = True add_path_to_ldpath = True
rebuild_backend = False rebuild_backend = False
run_backend = True run_backend = True

@ -5,20 +5,18 @@
# You can obtain one at http://mozilla.org/MPL/2.0/. # You can obtain one at http://mozilla.org/MPL/2.0/.
# #
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com) # Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
# # Bill Sun 2022 - 2023
from __future__ import absolute_import, division, unicode_literals from __future__ import absolute_import, division, unicode_literals
import json import json
from threading import Lock from threading import Lock
from aquery_parser.sql_parser import scrub from aquery_parser.parser import scrub
from aquery_parser.utils import ansi_string, simple_op, normal_op from aquery_parser.utils import ansi_string, simple_op, normal_op
import aquery_parser.parser
parse_locker = Lock() # ENSURE ONLY ONE PARSING AT A TIME parse_locker = Lock() # ENSURE ONLY ONE PARSING AT A TIME
common_parser = None common_parser = None
mysql_parser = None
sqlserver_parser = None
SQL_NULL = {"null": {}} SQL_NULL = {"null": {}}
@ -33,44 +31,10 @@ def parse(sql, null=SQL_NULL, calls=simple_op):
with parse_locker: with parse_locker:
if not common_parser: if not common_parser:
common_parser = sql_parser.common_parser() common_parser = aquery_parser.parser.common_parser()
result = _parse(common_parser, sql, null, calls) result = _parse(common_parser, sql, null, calls)
return result return result
def parse_mysql(sql, null=SQL_NULL, calls=simple_op):
"""
PARSE MySQL ASSUME DOUBLE QUOTED STRINGS ARE LITERALS
:param sql: String of SQL
:param null: What value to use as NULL (default is the null function `{"null":{}}`)
:return: parse tree
"""
global mysql_parser
with parse_locker:
if not mysql_parser:
mysql_parser = sql_parser.mysql_parser()
return _parse(mysql_parser, sql, null, calls)
def parse_sqlserver(sql, null=SQL_NULL, calls=simple_op):
"""
PARSE MySQL ASSUME DOUBLE QUOTED STRINGS ARE LITERALS
:param sql: String of SQL
:param null: What value to use as NULL (default is the null function `{"null":{}}`)
:return: parse tree
"""
global sqlserver_parser
with parse_locker:
if not sqlserver_parser:
sqlserver_parser = sql_parser.sqlserver_parser()
return _parse(sqlserver_parser, sql, null, calls)
parse_bigquery = parse_mysql
def _parse(parser, sql, null, calls): def _parse(parser, sql, null, calls):
utils.null_locations = [] utils.null_locations = []
utils.scrub_op = calls utils.scrub_op = calls
@ -85,4 +49,4 @@ def _parse(parser, sql, null, calls):
_ = json.dumps _ = json.dumps
__all__ = ["parse", "format", "parse_mysql", "parse_bigquery", "normal_op", "simple_op"] __all__ = ["parse", "format", "normal_op", "simple_op"]

@ -5,7 +5,7 @@
# You can obtain one at http://mozilla.org/MPL/2.0/. # You can obtain one at http://mozilla.org/MPL/2.0/.
# #
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com) # Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
# # Bill Sun 2022 - 2023
# SQL CONSTANTS # SQL CONSTANTS
from mo_parsing import * from mo_parsing import *

@ -5,7 +5,7 @@
# You can obtain one at http://mozilla.org/MPL/2.0/. # You can obtain one at http://mozilla.org/MPL/2.0/.
# #
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com) # Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
# # Bill Sun 2022 - 2023
from sre_parse import WHITESPACE from sre_parse import WHITESPACE
@ -28,37 +28,12 @@ simple_ident = Regex(simple_ident.__regex__()[1])
def common_parser(): def common_parser():
combined_ident = Combine(delimited_list( combined_ident = Combine(delimited_list(
ansi_ident | mysql_backtick_ident | simple_ident, separator=".", combine=True, ansi_ident | aquery_backtick_ident | simple_ident, separator=".", combine=True,
)).set_parser_name("identifier")
return parser(ansi_string | mysql_doublequote_string, combined_ident)
def mysql_parser():
mysql_string = ansi_string | mysql_doublequote_string
mysql_ident = Combine(delimited_list(
mysql_backtick_ident | sqlserver_ident | simple_ident,
separator=".",
combine=True,
)).set_parser_name("mysql identifier")
return parser(mysql_string, mysql_ident)
def sqlserver_parser():
combined_ident = Combine(delimited_list(
ansi_ident
| mysql_backtick_ident
| sqlserver_ident
| Word(FIRST_IDENT_CHAR, IDENT_CHAR),
separator=".",
combine=True,
)).set_parser_name("identifier") )).set_parser_name("identifier")
return parser(ansi_string, combined_ident, sqlserver=True) return parser(ansi_string | aquery_doublequote_string, combined_ident)
def parser(literal_string, ident):
def parser(literal_string, ident, sqlserver=False):
with Whitespace() as engine: with Whitespace() as engine:
engine.add_ignore(Literal("--") + restOfLine) engine.add_ignore(Literal("--") + restOfLine)
engine.add_ignore(Literal("#") + restOfLine) engine.add_ignore(Literal("#") + restOfLine)
@ -184,12 +159,10 @@ def parser(literal_string, ident, sqlserver=False):
) )
) )
if not sqlserver: create_array = (
# SQL SERVER DOES NOT SUPPORT [] FOR ARRAY CONSTRUCTION (USED FOR IDENTIFIERS) Literal("[") + delimited_list(Group(expr))("args") + Literal("]")
create_array = ( | create_array
Literal("[") + delimited_list(Group(expr))("args") + Literal("]") )
| create_array
)
create_array = create_array / to_array create_array = create_array / to_array

@ -5,7 +5,7 @@
# You can obtain one at http://mozilla.org/MPL/2.0/. # You can obtain one at http://mozilla.org/MPL/2.0/.
# #
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com) # Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
# # Bill Sun 2022 - 2023
# KNOWN TYPES # KNOWN TYPES

@ -5,7 +5,7 @@
# You can obtain one at http://mozilla.org/MPL/2.0/. # You can obtain one at http://mozilla.org/MPL/2.0/.
# #
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com) # Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
# # Bill Sun 2022 - 2023
import ast import ast
@ -610,9 +610,8 @@ hex_num = (
# STRINGS # STRINGS
ansi_string = Regex(r"\'(\'\'|[^'])*\'") / to_string ansi_string = Regex(r"\'(\'\'|[^'])*\'") / to_string
mysql_doublequote_string = Regex(r'\"(\"\"|[^"])*\"') / to_string aquery_doublequote_string = Regex(r'\"(\"\"|[^"])*\"') / to_string
# BASIC IDENTIFIERS # BASIC IDENTIFIERS
ansi_ident = Regex(r'\"(\"\"|[^"])*\"') / unquote ansi_ident = Regex(r'\"(\"\"|[^"])*\"') / unquote
mysql_backtick_ident = Regex(r"\`(\`\`|[^`])*\`") / unquote aquery_backtick_ident = Regex(r"\`(\`\`|[^`])*\`") / unquote
sqlserver_ident = Regex(r"\[(\]\]|[^\]])*\]") / unquote

@ -0,0 +1,2 @@
make snippet_uselib
cp ./dll.so procedures/q70.so

@ -0,0 +1 @@
Subproject commit fa4e3f5a0606b2dda75faaacfb66cdaf42153260

@ -5,14 +5,14 @@ from typing import List
name : str = input('Filename (in path ./procedures/<filename>.aqp):') name : str = input('Filename (in path ./procedures/<filename>.aqp):')
def write(): def write():
s : str = input() s : str = input('Enter queries: empty line to stop. \n')
qs : List[str] = [] qs : List[str] = []
while(len(s) and not s.startswith('S')): while(len(s) and not s.startswith('S')):
qs.append(s) qs.append(s)
s = input() s = input()
ms : int = int(input()) ms : int = int(input('number of modules:'))
with open(f'./procedures/{name}.aqp', 'wb') as fp: with open(f'./procedures/{name}.aqp', 'wb') as fp:
fp.write(struct.pack("I", len(qs) + (ms > 0))) fp.write(struct.pack("I", len(qs) + (ms > 0)))
@ -27,21 +27,29 @@ def write():
fp.write(b'\x00') fp.write(b'\x00')
def read(): def read(cmd : str):
rc = len(cmd) > 1 and cmd[1] == 'c'
clip = ''
with open(f'./procedures/{name}.aqp', 'rb') as fp: with open(f'./procedures/{name}.aqp', 'rb') as fp:
nq = struct.unpack("I", fp.read(4))[0] nq = struct.unpack("I", fp.read(4))[0]
ms = struct.unpack("I", fp.read(4))[0] ms = struct.unpack("I", fp.read(4))[0]
qs = fp.read().split(b'\x00') qs = fp.read().split(b'\x00')
print(f'Procedure {name}, {nq} queries, {ms} modules:') print(f'Procedure {name}, {nq} queries, {ms} modules:')
for q in qs: for q in qs:
print(' ' + q.decode('utf-8')) q = q.decode('utf-8').strip()
if q:
q = f'"{q}",' if rc else f'\t{q}'
print(q)
clip += q + '\n'
if rc and not input('copy to clipboard?').lower().startswith('n'):
import pyperclip
pyperclip.copy(clip)
if __name__ == '__main__': if __name__ == '__main__':
while True: while True:
cmd = input("r for read, w for write: ") cmd = input("r for read, rc to read c_str, w for write: ")
if cmd.lower().startswith('r'): if cmd.lower().startswith('r'):
read() read(cmd.lower())
break break
elif cmd.lower().startswith('w'): elif cmd.lower().startswith('w'):
write() write()

@ -0,0 +1,44 @@
import os
sep = os.sep
# toggles
dataset = 'power' # [covtype, electricity, mixed, phishing, power]
use_threadpool = False # True
# environments
input_prefix = f'data{sep}{dataset}_orig'
output_prefix = f'data{sep}{dataset}'
sep_field = b','
sep_subfield = b';'
lst_files = os.listdir(input_prefix)
# lst_files.sort()
try:
os.mkdir(output_prefix)
except FileExistsError:
pass
def process(f : str):
filename = input_prefix + sep + f
ofilename = output_prefix + sep + f[:-3] + 'csv'
with open(filename, 'rb') as ifile:
icontents = ifile.read()
with open(ofilename, 'wb') as ofile:
ofile.write(b'\n')
for l in icontents.splitlines():
fields = l.strip().split(b' ')
subfields = fields[:-1]
ol = ( # fields[0] + sep_field +
sep_subfield.join(subfields) +
sep_field + fields[-1] + b'\n')
ofile.write(ol)
if not use_threadpool:
for f in lst_files:
process(f)
elif __name__ == '__main__':
from multiprocessing import Pool
with Pool(8) as tp:
tp.map(process, lst_files)

@ -9,49 +9,52 @@ struct DR;
struct DT; struct DT;
//enum Evaluation {gini, entropy, logLoss}; class DecisionTree
{
class DecisionTree{
public: public:
DT *DTree = nullptr;
double minIG;
long maxHeight;
long feature;
long maxFeature;
bool isRF;
long classes;
int *Sparse;
double forgetRate;
double increaseRate;
double initialIR;
Evaluation evalue;
long Rebuild;
long roundNo;
long called;
long retain;
long lastT;
long lastAll;
DT* DTree = nullptr; DecisionTree(long f, int *sparse, double forget, long maxFeature, long noClasses, Evaluation e);
int maxHeight;
long feature;
long maxFeature;
long seed;
long classes;
int* Sparse;
double forgetRate;
Evaluation evalue;
long Rebuild;
long roundNo;
long called;
long retain;
DecisionTree(int hight, long f, int* sparse, double forget, long maxFeature, long noClasses, Evaluation e, long r, long rb);
void Stablelize(); void Stablelize();
void Free(); void Free();
minEval findMinGiniDense(double** data, long* result, long* totalT, long size, long col); minEval findMinGiniDense(double **data, long *result, long *totalT, long size, long col);
minEval findMinGiniSparse(double** data, long* result, long* totalT, long size, long col, DT* current); minEval findMinGiniSparse(double **data, long *result, long *totalT, long size, long col, DT *current);
minEval incrementalMinGiniDense(double** data, long* result, long size, long col, long*** count, double** record, long* max, long newCount, long forgetSize, bool isRoot); minEval incrementalMinGiniDense(double **data, long *result, long size, long col, long ***count, double **record, long *max, long newCount, long forgetSize, double **forgottenData, long *forgottenClass);
minEval incrementalMinGiniSparse(double** dataNew, long* resultNew, long sizeNew, long sizeOld, DT* current, long col, long forgetSize, bool isRoot); minEval incrementalMinGiniSparse(double **dataNew, long *resultNew, long sizeNew, long sizeOld, DT *current, long col, long forgetSize, double **forgottenData, long *forgottenClass);
long* fitThenPredict(double** trainData, long* trainResult, long trainSize, double** testData, long testSize); long *fitThenPredict(double **trainData, long *trainResult, long trainSize, double **testData, long testSize);
void fit(double** data, long* result, long size); void fit(double **data, long *result, long size);
void Update(double** data, long* result, long size, DT* current); void Update(double **data, long *result, long size, DT *current);
void IncrementalUpdate(double** data, long* result, long size, DT* current); void IncrementalUpdate(double **data, long *result, long size, DT *current);
long Test(double* data, DT* root); long Test(double *data, DT *root);
void print(DT* root); void print(DT *root);
}; };
#endif #endif

@ -26,7 +26,7 @@ minEval giniSparse(double** data, long* result, long* d, long size, long col, lo
double gini1, gini2; double gini1, gini2;
double c; double c;
long l, r; long l, r;
for(i=0; i<size; i++){ for(i=0; i<size-1; i++){
c = data[d[i]][col]; c = data[d[i]][col];
if(c==max)break; if(c==max)break;
count[result[d[i]]]++; count[result[d[i]]]++;
@ -62,7 +62,7 @@ minEval entropySparse(double** data, long* result, long* d, long size, long col,
double entropy1, entropy2; double entropy1, entropy2;
double c; double c;
long l, r; long l, r;
for(i=0; i<size; i++){ for(i=0; i<size-1; i++){
c = data[d[i]][col]; c = data[d[i]][col];
if(c==max)break; if(c==max)break;
count[result[d[i]]]++; count[result[d[i]]]++;
@ -73,8 +73,8 @@ minEval entropySparse(double** data, long* result, long* d, long size, long col,
for(j=0;j<classes;j++){ for(j=0;j<classes;j++){
l = count[j]; l = count[j];
r = totalT[j]-l; r = totalT[j]-l;
entropy1 -= ((double)l/total)*log((double)l/total); if(l!=0)entropy1 -= ((double)l/total)*log((double)l/total);
entropy2 -= ((double)r/(size-total))*log((double)r/(size-total)); if(r!=0)entropy2 -= ((double)r/(size-total))*log((double)r/(size-total));
} }
entropy1 = entropy1*total/size + entropy2*(size-total)/size; entropy1 = entropy1*total/size + entropy2*(size-total)/size;
if(ret.eval>entropy1){ if(ret.eval>entropy1){
@ -140,8 +140,8 @@ minEval entropySparseIncremental(long sizeTotal, long classes, double* newSorted
for(j=0;j<classes;j++){ for(j=0;j<classes;j++){
l = count[j]; l = count[j];
r = T[j]-l; r = T[j]-l;
e1 -= ((double)l/total)*log((double)l/total); if(l!=0)e1 -= ((double)l/total)*log((double)l/total);
e2 -= ((double)r/(sizeTotal-total))*log((double)r/(sizeTotal-total)); if(r!=0)e2 -= ((double)r/(sizeTotal-total))*log((double)r/(sizeTotal-total));
} }
e1 = e1*total/sizeTotal + e2*(sizeTotal-total)/sizeTotal; e1 = e1*total/sizeTotal + e2*(sizeTotal-total)/sizeTotal;
if(ret.eval>e1){ if(ret.eval>e1){
@ -159,9 +159,9 @@ minEval giniDense(long max, long size, long classes, long** rem, long* d, double
double gini1, gini2; double gini1, gini2;
long *t, *t2, *r, *r2, i, j; long *t, *t2, *r, *r2, i, j;
for(i=0;i<max;i++){ for(i=0;i<max;i++){
t = rem[d[i]]; t = rem[i];
if(i>0){ if(i>0){
t2 = rem[d[i-1]]; t2 = rem[i-1];
for(j=0;j<=classes;j++){ for(j=0;j<=classes;j++){
t[j]+=t2[j]; t[j]+=t2[j];
} }
@ -179,7 +179,7 @@ minEval giniDense(long max, long size, long classes, long** rem, long* d, double
gini1 = (gini1*t[classes])/size + (gini2*(size-t[classes]))/size; gini1 = (gini1*t[classes])/size + (gini2*(size-t[classes]))/size;
if(gini1<ret.eval){ if(gini1<ret.eval){
ret.eval = gini1; ret.eval = gini1;
ret.value = record[d[i]]; ret.value = record[i];
ret.left = t[classes]; ret.left = t[classes];
} }
} }
@ -193,9 +193,9 @@ minEval entropyDense(long max, long size, long classes, long** rem, long* d, dou
double entropy1, entropy2; double entropy1, entropy2;
long *t, *t2, *r, *r2, i, j; long *t, *t2, *r, *r2, i, j;
for(i=0;i<max;i++){ for(i=0;i<max;i++){
t = rem[d[i]]; t = rem[i];
if(i>0){ if(i>0){
t2 = rem[d[i-1]]; t2 = rem[i-1];
for(j=0;j<=classes;j++){ for(j=0;j<=classes;j++){
t[j]+=t2[j]; t[j]+=t2[j];
} }
@ -207,71 +207,15 @@ minEval entropyDense(long max, long size, long classes, long** rem, long* d, dou
long l, r; long l, r;
l = t[j]; l = t[j];
r = totalT[j]-l; r = totalT[j]-l;
entropy1 -= ((double)l/t[classes])*log((double)l/t[classes]); if(l!=0)entropy1 -= ((double)l/t[classes])*log((double)l/t[classes]);
entropy2 -= ((double)r/(size-t[classes]))*log((double)r/(size-t[classes])); if(r!=0)entropy2 -= ((double)r/(size-t[classes]))*log((double)r/(size-t[classes]));
} }
entropy1 = entropy1*t[classes]/size + entropy2*(size-t[classes])/size; entropy1 = entropy1*t[classes]/size + entropy2*(size-t[classes])/size;
if(entropy1<ret.eval){ if(entropy1<ret.eval){
ret.eval = entropy1; ret.eval = entropy1;
ret.value = record[d[i]];
ret.left = t[classes];
}
}
return ret;
}
minEval giniDenseIncremental(long max, double* record, long** count, long classes, long newSize, long* T){
double gini1, gini2;
minEval ret;
long i, j;
ret.eval = DBL_MAX;
for(i=0; i<max; i++){
if(count[i][classes]==newSize){
continue;
}
gini1 = 1.0;
gini2 = 1.0;
for(j=0;j<classes;j++){
long l, r;
l = count[i][j];
r = T[j]-l;
gini1 -= pow((double)l/count[i][classes], 2);
gini2 -= pow((double)r/(newSize-count[i][classes]), 2);
}
gini1 = gini1*count[i][classes]/newSize + gini2*((newSize-count[i][classes]))/newSize;
if(gini1<ret.eval){
ret.eval = gini1;
ret.value = record[i]; ret.value = record[i];
ret.left = t[classes];
} }
} }
return ret; return ret;
} }
minEval entropyDenseIncremental(long max, double* record, long** count, long classes, long newSize, long* T){
double entropy1, entropy2;
minEval ret;
long i, j;
ret.eval = DBL_MAX;
for(i=0; i<max; i++){
if(count[i][classes]==newSize or count[i][classes]==0){
continue;
}
entropy1 = 0;
entropy2 = 0;
for(j=0;j<classes;j++){
long l, r;
l = count[i][j];
r = T[j]-l;
entropy1 -= ((double)l/count[i][classes])*log((double)l/count[i][classes]);
entropy2 -= (double)r/(newSize-count[i][classes])*log((double)r/(newSize-count[i][classes]));
}
entropy1 = entropy1*count[i][classes]/newSize + entropy2*((newSize-count[i][classes]))/newSize;
if(entropy1<ret.eval){
ret.eval = entropy1;
ret.value = record[i];
}
}
return ret;
}

@ -17,8 +17,4 @@ minEval giniDense(long max, long size, long classes, long** rem, long* d, double
minEval entropyDense(long max, long size, long classes, long** rem, long* d, double* record, long* totalT); minEval entropyDense(long max, long size, long classes, long** rem, long* d, double* record, long* totalT);
minEval giniDenseIncremental(long max, double* record, long** count, long classes, long newSize, long* T);
minEval entropyDenseIncremental(long max, double* record, long** count, long classes, long newSize, long* T);
#endif #endif

@ -2,7 +2,27 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <ctime> #include <ctime>
#include <math.h>
#include <algorithm>
#include <boost/math/distributions/students_t.hpp>
#include <random>
long poisson(int Lambda)
{
int k = 0;
long double p = 1.0;
long double l = exp(-Lambda);
srand((long)clock());
while(p>=l)
{
double u = (double)(rand()%10000)/10000;
p *= u;
k++;
}
if (k>11)k=11;
return k-1;
}
struct DT{ struct DT{
int height; int height;
long* featureId; long* featureId;
@ -30,62 +50,126 @@ struct DT{
long size = 0;// Size of the dataset long size = 0;// Size of the dataset
}; };
RandomForest::RandomForest(long mTree, long actTree, long rTime, int h, long feature, int* s, double forg, long maxF, long noC, Evaluation eval, long r, long rb){ RandomForest::RandomForest(long mTree, long feature, int* s, double forg, long noC, Evaluation eval, bool b, double t){
srand((long)clock()); srand((long)clock());
Rebuild = rb; bagging = b;
if(actTree<1)actTree=1; activeTree = mTree;
noTree = actTree;
activeTree = actTree;
treePointer = 0;
if(mTree<actTree)mTree=activeTree;
maxTree = mTree; maxTree = mTree;
if(rTime<=0)rTime=1; allT = new long[mTree];
rotateTime = rTime;
timer = 0;
retain = r;
tThresh=t;
lastT = -2;
lastAll = 0;
long i; long i;
height = h;
f = feature; f = feature;
sparse = new int[f]; sparse = new int[f];
for(i=0; i<f; i++)sparse[i]=s[i]; for(i=0; i<f; i++)sparse[i]=s[i];
forget = forg; forget = forg;
maxFeature = maxF;
noClasses = noC; noClasses = noC;
e = eval; e = eval;
minF = floor(sqrt((double)f))+2;
if(minF>f)minF=f;
DTrees = (DecisionTree**)malloc(mTree*sizeof(DecisionTree*)); DTrees = (DecisionTree**)malloc(mTree*sizeof(DecisionTree*));
for(i=0; i<mTree; i++){ for(i=0; i<maxTree; i++){
if(i<actTree){ DTrees[i] = new DecisionTree(f, sparse, forget, minF+rand()%(f+1-minF), noClasses, e);
DTrees[i] = new DecisionTree(height, f, sparse, forget, maxFeature, noClasses, e, r, rb); DTrees[i]->isRF=true;
}
else{
DTrees[i]=nullptr;
}
} }
} }
void RandomForest::fit(double** data, long* result, long size){ void RandomForest::fit(double** data, long* result, long size){
if(timer==rotateTime and maxTree!=activeTree){ long i, j, k, l;
Rotate();
timer=0;
}
long i, j, k;
double** newData; double** newData;
long* newResult; long* newResult;
for(i=0; i<activeTree; i++){ long localT = 0;
newData = new double*[size]; int stale = 0;
newResult = new long[size]; if(lastT==-2){
for(j = 0; j<size; j++){ lastT=-1;
newData[j] = new double[f]; }else{
for(k=0; k<f; k++){ for(i=0; i<maxTree; i++)allT[i] = 0;
newData[j][k] = data[j][k]; for(i=0; i<size; i++){
if(Test(data[i], result[i])==result[i])localT++;
}
long localAll = size;
if(lastT>=0){
double lastSm = (double)lastT/lastAll;
double localSm = (double)localT/localAll;
double lastSd = sqrt(pow((1.0-lastSm),2)*lastT+pow(lastSm,2)*(lastAll-lastT)/(lastAll-1));
double localSd = sqrt(pow((1.0-localSm),2)*localT+pow(localSm,2)*(localAll-localT)/(localAll-1));
double v = lastAll+localAll-2;
double sp = sqrt(((lastAll-1) * lastSd * lastSd + (localAll-1) * localSd * localSd) / v);
double q;
double t = lastSm-localSm;
if(sp==0){q = 1;}
else{
t = t/(sp*sqrt(1.0/lastAll+1.0/localAll));
boost::math::students_t dist(v);
double c = cdf(dist, t);
q = cdf(complement(dist, fabs(t)));
}
if(q<=tThresh){
lastT += localT;
lastAll += localAll;
}else if(t<0){
lastT = localT;
lastAll = localAll;
}else{
double newAcc = (double)localT/localAll;
double lastAcc= (double)lastT/lastAll;
stale = floor((newAcc-lastAcc)/(lastAcc)*maxTree);
lastT = localT;
lastAll = localAll;
}
}else{
lastT = localT;
lastAll = localAll;
}
}
Rotate(stale);
for(i=0; i<maxTree; i++){
long times;
if(bagging)times = poisson(6);
else times=1;
if(times==0)continue;
newData = (double**)malloc(sizeof(double*)*size*times);
newResult = (long*)malloc(sizeof(long)*size*times);
long c = 0;
for(j = 0; j<size*times; j++){
long jj;
if(bagging) jj = rand()%size;
else jj=j;
newData[j] = (double*)malloc((f+1)*sizeof(double));
for(l=0; l<f; l++){
newData[j][l] = data[jj][l];
} }
newResult[j] = result[j]; newData[j][f] = 0;
newResult[j] = result[jj];
} }
DTrees[(i+treePointer)%maxTree]->fit(newData, newResult, size); DTrees[i]->fit(newData, newResult, size*times);
} }
timer++; /*for(i=0; i<maxTree; i++){
//backupTrees[i]->retain = 10*size;
//if(backupTrees[i]==nullptr) continue;
long times;
//times = poisson(posMean);
//if(times==0)continue;
times=1;
newData = (double**)malloc(sizeof(double*)*size*times);
newResult = (long*)malloc(sizeof(long)*size*times);
long c = 0;
for(j = 0; j<size; j++){
long jj = rand()%size;
jj=j;
for(k=0; k<times; k++){
newData[j*times+k] = (double*)malloc((f+1)*sizeof(double));
for(l=0; l<f; l++){
newData[j*times+k][l] = data[jj][l];
}
newData[j*times+k][f] = 0;
newResult[j*times+k] = result[jj];
}
}
backupTrees[i]->fit(newData, newResult, size*times);
}*/
} }
long* RandomForest::fitThenPredict(double** trainData, long* trainResult, long trainSize, double** testData, long testSize){ long* RandomForest::fitThenPredict(double** trainData, long* trainResult, long trainSize, double** testData, long testSize){
@ -97,36 +181,80 @@ long* RandomForest::fitThenPredict(double** trainData, long* trainResult, long t
return testResult; return testResult;
} }
void RandomForest::Rotate(){ void RandomForest::Rotate(long stale){
if(noTree==maxTree){ long i, j, k;
DTrees[(treePointer+activeTree)%maxTree]->Free(); long minIndex = -1;
delete DTrees[(treePointer+activeTree)%maxTree]; if(stale>=0)return;
}else{ else{
noTree++; stale = std::min(stale, maxTree);
stale*=-1;
while(stale>0){
long currentMin = 2147483647;
for(i = 0; i<maxTree; i++){
if(allT[i]<currentMin){
currentMin=allT[i];
minIndex=i;
}
}
stale--;
if(minIndex<0)break;
allT[minIndex] = 2147483647;
double** newData;
long* newResult;
long size = 0;
long lastT2 = 0;
size = DTrees[minIndex]->DTree->size;
newData = (double**)malloc(sizeof(double*)*size);
newResult = (long*)malloc(sizeof(long)*size);
for(j = 0; j<size; j++){
newData[j] = (double*)malloc(sizeof(double)*(f+1));
for(k=0; k<f; k++){
newData[j][k] = DTrees[minIndex]->DTree->dataRecord[j][k];
}
newData[j][f] = 0;
newResult[j] = DTrees[minIndex]->DTree->resultRecord[j];
}
DTrees[minIndex]->Stablelize();
DTrees[minIndex]->Free();
delete DTrees[minIndex];
DTrees[minIndex] = new DecisionTree(f, sparse, forget, minF+rand()%(f+1-minF), noClasses, e);
DTrees[minIndex]->isRF=true;
DTrees[minIndex]->fit(newData, newResult, size);
for(j=0; j<size; j++){
if(DTrees[minIndex]->Test(newData[j], DTrees[minIndex]->DTree)==newResult[j])lastT2++;
}
DTrees[minIndex]->lastAll=size;
DTrees[minIndex]->lastT=lastT2;
}
} }
DTrees[(treePointer+activeTree)%maxTree] = new DecisionTree(height, f, sparse, forget, maxFeature, noClasses, e, retain, Rebuild);
long size = DTrees[(treePointer+activeTree-1)%maxTree]->DTree->size;
double** newData = new double*[size];
long* newResult = new long[size];
for(long j = 0; j<size; j++){
newData[j] = new double[f];
for(long k=0; k<f; k++){
newData[j][k] = DTrees[(treePointer+activeTree-1)%maxTree]->DTree->dataRecord[j][k];
}
newResult[j] = DTrees[(treePointer+activeTree-1)%maxTree]->DTree->resultRecord[j];
}
DTrees[(treePointer+activeTree)%maxTree]->fit(newData, newResult, size);
DTrees[treePointer]->Stablelize();
if(++treePointer==maxTree)treePointer=0;
} }
long RandomForest::Test(double* data, long result){
long i;
long predict[noClasses];
for(i=0; i<noClasses; i++){
predict[i]=0;
}
for(i=0; i<maxTree; i++){
long tmp = DTrees[i]->Test(data, DTrees[i]->DTree);
predict[tmp]++;
if(tmp==result)allT[i]++;
}
long ret = 0;
for(i=1; i<noClasses; i++){
if(predict[i]>predict[ret])ret = i;
}
return ret;
}
long RandomForest::Test(double* data){ long RandomForest::Test(double* data){
long i; long i;
long predict[noClasses]; long predict[noClasses];
for(i=0; i<noClasses; i++)predict[i]=0; for(i=0; i<noClasses; i++)predict[i]=0;
for(i=0; i<noTree; i++){ for(i=0; i<maxTree; i++){
predict[DTrees[i]->Test(data, DTrees[i]->DTree)]++; predict[DTrees[i]->Test(data, DTrees[i]->DTree)]++;
} }

@ -14,33 +14,34 @@ struct DT;
class RandomForest{ class RandomForest{
public: public:
long noTree;
long maxTree; long maxTree;
long activeTree; long activeTree;
long treePointer; long* allT;
long rotateTime; double tThresh;
long timer; DecisionTree** DTrees;
long retain; DecisionTree** backupTrees;
DecisionTree** DTrees = nullptr;
long height; long height;
long Rebuild; bool bagging;
long f; long f;
int* sparse; int* sparse;
double forget; double forget;
long maxFeature;
long noClasses; long noClasses;
Evaluation e; Evaluation e;
long lastT;
long lastAll;
int minF;
RandomForest(long maxTree, long f, int* sparse, double forget, long noClasses=2, Evaluation e=Evaluation::entropy, bool b=false, double tThresh=0.05);
RandomForest(long maxTree, long activeTree, long rotateTime, int height, long f, int* sparse, double forget, long maxFeature=0, long noClasses=2, Evaluation e=Evaluation::gini, long r=-1, long rb=2147483647);
void fit(double** data, long* result, long size); void fit(double** data, long* result, long size);
long* fitThenPredict(double** trainData, long* trainResult, long trainSize, double** testData, long testSize); long* fitThenPredict(double** trainData, long* trainResult, long trainSize, double** testData, long testSize);
void Rotate(); void Rotate(long stale);
long Test(double* data); long Test(double* data);
long Test(double* data, long result);
}; };
#endif #endif

@ -11,6 +11,7 @@
std::random_device rd; std::random_device rd;
std::mt19937 g(rd()); std::mt19937 g(rd());
struct minEval{ struct minEval{
double value; double value;
int* values; int* values;
@ -23,10 +24,11 @@ struct minEval{
}; };
struct DT{ struct DT{
int height; long height;
long* featureId; long* featureId;
DT* left = nullptr; DT* left = nullptr;
DT* right = nullptr; DT* right = nullptr;
bool created;
// split info // split info
bool terminate; bool terminate;
@ -47,19 +49,22 @@ struct DT{
double** dataRecord = nullptr;// Record the data double** dataRecord = nullptr;// Record the data
long* resultRecord = nullptr;// Record the result long* resultRecord = nullptr;// Record the result
long size = 0;// Size of the dataset long size = 0;// Size of the dataset
}; };
long seed = (long)clock(); long seed = (long)clock();
long* Rands(long feature, long maxFeature){ long* Rands(long feature, long maxFeature){
//srand(seed++); srand(seed);
long i; long i;
long* ret = (long*) malloc(feature*sizeof(long)); long* ret = (long*) malloc(feature*sizeof(long));
for(i =0; i<feature; i++)ret[i] = i; for(i =0; i<feature; i++)ret[i] = i;
if(maxFeature==feature){ if(maxFeature==feature){
return ret; return ret;
} }
std::shuffle(ret, &ret[feature], g); std::shuffle(ret, ret+feature, g);
long* ret2 = (long*) malloc(maxFeature*sizeof(long)); long* ret2 = (long*) malloc(maxFeature*sizeof(long));
for(i=0; i<maxFeature; i++)ret2[i] = ret[i]; for(i=0; i<maxFeature; i++){
ret2[i] = ret[i];
}
free(ret); free(ret);
return ret2; return ret2;
} }
@ -67,9 +72,8 @@ double getRand(){
return (double) rand() / RAND_MAX; return (double) rand() / RAND_MAX;
} }
void createNode(DT* t, long currentHeight, long f, long classes){
void createTree(DT* t, long currentHeight, long height, long f, long maxF, long classes){ t->created = true;
srand(seed);
long i; long i;
t->count = (long***)malloc(f*sizeof(long**)); t->count = (long***)malloc(f*sizeof(long**));
for(i=0; i<f; i++)t->count[i]=nullptr; for(i=0; i<f; i++)t->count[i]=nullptr;
@ -77,8 +81,6 @@ void createTree(DT* t, long currentHeight, long height, long f, long maxF, long
for(i=0; i<f; i++)t->record[i]=nullptr; for(i=0; i<f; i++)t->record[i]=nullptr;
t->max = (long*)malloc(f*sizeof(long)); t->max = (long*)malloc(f*sizeof(long));
t->max[0] = -1; t->max[0] = -1;
t->featureId = Rands(f, maxF);
//t->T = (long*)malloc(classes*sizeof(long));
t->sortedData = (double**) malloc(f*sizeof(double*)); t->sortedData = (double**) malloc(f*sizeof(double*));
for(i=0; i<f; i++)t->sortedData[i]=nullptr; for(i=0; i<f; i++)t->sortedData[i]=nullptr;
t->sortedResult = (long**) malloc(f*sizeof(long*)); t->sortedResult = (long**) malloc(f*sizeof(long*));
@ -88,20 +90,18 @@ void createTree(DT* t, long currentHeight, long height, long f, long maxF, long
t->height = currentHeight; t->height = currentHeight;
t->feature = -1; t->feature = -1;
t->size = 0; t->size = 0;
if(currentHeight>height){
t->right = nullptr;
t->left = nullptr;
return;
}
t->left = (DT*)malloc(sizeof(DT)); t->left = (DT*)malloc(sizeof(DT));
t->right = (DT*)malloc(sizeof(DT)); t->right = (DT*)malloc(sizeof(DT));
createTree(t->left, currentHeight+1, height, f, maxF, classes); t->left->created = false;
createTree(t->right, currentHeight+1, height, f, maxF, classes); t->right->created = false;
t->left->height = currentHeight+1;
t->right->height = currentHeight+1;
} }
void stableTree(DT* t, long f){ void stableTree(DT* t, long f){
long i, j; long i, j;
if(not t->created)return;
for(i=0; i<f; i++){ for(i=0; i<f; i++){
if(t->count[i]==nullptr)continue; if(t->count[i]==nullptr)continue;
for(j=0; j<t->max[i]; j++){ for(j=0; j<t->max[i]; j++){
@ -116,7 +116,6 @@ void stableTree(DT* t, long f){
} }
free(t->record); free(t->record);
free(t->max); free(t->max);
free(t->featureId);
for(i=0; i<f; i++){ for(i=0; i<f; i++){
if(t->sortedData[i]==nullptr)continue; if(t->sortedData[i]==nullptr)continue;
free(t->sortedData[i]); free(t->sortedData[i]);
@ -126,25 +125,28 @@ void stableTree(DT* t, long f){
if(t->sortedResult[i]==nullptr)continue; if(t->sortedResult[i]==nullptr)continue;
free(t->sortedResult[i]); free(t->sortedResult[i]);
} }
free(t->sortedResult);
free(t->dataRecord); free(t->dataRecord);
free(t->resultRecord); free(t->resultRecord);
free(t->sortedResult);
if(t->right!=nullptr)stableTree(t->right, f); if(t->right!=nullptr)stableTree(t->right, f);
if(t->left!=nullptr)stableTree(t->left, f); if(t->left!=nullptr)stableTree(t->left, f);
} }
void freeTree(DT* t){ void freeTree(DT* t){
if(t->left != nullptr)freeTree(t->left); if(t->created){
if(t->right != nullptr)freeTree(t->right); freeTree(t->left);
freeTree(t->right);
}
free(t); free(t);
} }
DecisionTree::DecisionTree(int height, long f, int* sparse, double forget=0.1, long maxF=0, long noClasses=2, Evaluation e=Evaluation::gini, long r=-1, long rb=1){ DecisionTree::DecisionTree(long f, int* sparse, double rate, long maxF, long noClasses, Evaluation e){
evalue = e; evalue = e;
called = 0;
long i; long i;
// Max tree height // Max tree height
maxHeight = height; initialIR = rate;
increaseRate = rate;
isRF = false;
// Number of features // Number of features
feature = f; feature = f;
// If each feature is sparse or dense, 0 for dense, 1 for sparse, >2 for number of category // If each feature is sparse or dense, 0 for dense, 1 for sparse, >2 for number of category
@ -157,40 +159,69 @@ DecisionTree::DecisionTree(int height, long f, int* sparse, double forget=0.1, l
DTree->feature = -1; DTree->feature = -1;
// The number of feature that is considered in each node // The number of feature that is considered in each node
if(maxF>=f){ if(maxF>=f){
maxFeature = f; maxF = f;
}else if(maxF<=0){ }else if(maxF<=0){
maxFeature = (long)round(sqrt(f)); maxF = (long)round(sqrt(f));
}else{ }
maxFeature = maxF; maxFeature = maxF;
} forgetRate = -10.0;
forgetRate = std::min(1.0, forget); retain = 0;
retain = r; DTree->featureId = Rands(f, maxF);
createTree(DTree, 0, maxHeight, f, maxFeature, noClasses); DTree->terminate = true;
// Randomly generate the features DTree->result = 0;
//DTree->featureId = Rands(); DTree->size = 0;
//DTree->sorted = (long**) malloc(f*sizeof(long*)); createNode(DTree, 0, f, noClasses);
// Number of classes of this dataset // Number of classes of this dataset
Rebuild = rb; Rebuild = 2147483647;
roundNo = 0; roundNo = 64;
classes = std::max(noClasses, (long)2); classes = std::max(noClasses, (long)2);
//DTree->T = (long*) malloc(noClasses*sizeof(long)); // last Acc
/*for(long i = 0; i<noClasses; i++){ lastAll = classes;
DTree->T[i]=0; lastT = 1;
}*/
} }
void DecisionTree::Stablelize(){ void DecisionTree::Stablelize(){
free(Sparse); free(Sparse);
stableTree(DTree, feature); long i, j;
DT* t = DTree;
long f = feature;
for(i=0; i<f; i++){
if(t->count[i]==nullptr)continue;
for(j=0; j<t->max[i]; j++){
free(t->count[i][j]);
}
free(t->count[i]);
}
free(t->count);
for(i=0; i<f; i++){
if(t->record[i]==nullptr)continue;
free(t->record[i]);
}
free(t->record);
free(t->max);
free(t->featureId);
for(i=0; i<f; i++){
if(t->sortedData[i]==nullptr)continue;
free(t->sortedData[i]);
}
free(t->sortedData);
for(i=0; i<f; i++){
if(t->sortedResult[i]==nullptr)continue;
free(t->sortedResult[i]);
}
free(t->sortedResult);
if(DTree->right!=nullptr)stableTree(t->right, feature);
if(DTree->left!=nullptr)stableTree(t->left, feature);
} }
void DecisionTree::Free(){ void DecisionTree::Free(){
free(DTree->dataRecord);
free(DTree->resultRecord);
freeTree(DTree); freeTree(DTree);
} }
minEval DecisionTree::incrementalMinGiniSparse(double** dataTotal, long* resultTotal, long sizeTotal, long sizeNew, DT* current, long col, long forgetSize, bool isRoot){ minEval DecisionTree::incrementalMinGiniSparse(double** dataTotal, long* resultTotal, long sizeTotal, long sizeNew, DT* current, long col, long forgetSize, double** forgottenData, long* forgottenClass){
long i, j; long i, j;
if(isRoot){sizeNew=sizeTotal-forgetSize;}
long newD[sizeNew]; long newD[sizeNew];
for(i=0; i<sizeNew; i++)newD[i]=i; for(i=0; i<sizeNew; i++)newD[i]=i;
long T[classes]; long T[classes];
@ -201,14 +232,28 @@ minEval DecisionTree::incrementalMinGiniSparse(double** dataTotal, long* resultT
long p1=0, p2=0; long p1=0, p2=0;
double* oldData = current->sortedData[col]; double* oldData = current->sortedData[col];
long* oldResult = current->sortedResult[col]; long* oldResult = current->sortedResult[col];
long tmp2 = forgetSize;
long* allForget = (long*)malloc(sizeof(long)*classes);
for(i=0; i<classes; i++)allForget[i]=0;
for(i=0; i<sizeTotal; i++){ for(i=0; i<sizeTotal; i++){
bool meet = false;
if(p1==sizeNew){ if(p1==sizeNew){
newSortedData[i] = oldData[p2]; j = oldResult[p2];
newSortedResult[i] = oldResult[p2]; if(allForget[j]!=forgottenClass[j]){
T[newSortedResult[i]]++; if(oldData[p2]==forgottenData[j][allForget[j]]){
allForget[j]++;
i--;
meet = true;
}
}
if(not meet){
newSortedData[i] = oldData[p2];
newSortedResult[i] = oldResult[p2];
T[newSortedResult[i]]++;
}
p2++; p2++;
} }
else if(p2==sizeTotal-sizeNew){ else if(p2==sizeTotal-sizeNew+forgetSize){
newSortedData[i] = dataTotal[newD[p1]][col]; newSortedData[i] = dataTotal[newD[p1]][col];
newSortedResult[i] = resultTotal[newD[p1]]; newSortedResult[i] = resultTotal[newD[p1]];
T[newSortedResult[i]]++; T[newSortedResult[i]]++;
@ -220,17 +265,27 @@ minEval DecisionTree::incrementalMinGiniSparse(double** dataTotal, long* resultT
T[newSortedResult[i]]++; T[newSortedResult[i]]++;
p1++; p1++;
}else{ }else{
newSortedData[i] = oldData[p2]; j = oldResult[p2];
newSortedResult[i] = oldResult[p2]; if(allForget[j]!=forgottenClass[j]){
T[newSortedResult[i]]++; if(oldData[p2]==forgottenData[j][allForget[j]]){
allForget[j]++;
i--;
meet = true;
}
}
if(not meet){
newSortedData[i] = oldData[p2];
newSortedResult[i] = oldResult[p2];
T[newSortedResult[i]]++;
}
p2++; p2++;
} }
} }
free(allForget);
current->sortedData[col] = newSortedData; current->sortedData[col] = newSortedData;
current->sortedResult[col] = newSortedResult; current->sortedResult[col] = newSortedResult;
free(oldData); free(oldData);
free(oldResult); free(oldResult);
minEval ret; minEval ret;
if(evalue == Evaluation::gini){ if(evalue == Evaluation::gini){
ret = giniSparseIncremental(sizeTotal, classes, newSortedData, newSortedResult, T); ret = giniSparseIncremental(sizeTotal, classes, newSortedData, newSortedResult, T);
@ -240,28 +295,43 @@ minEval DecisionTree::incrementalMinGiniSparse(double** dataTotal, long* resultT
ret.values = nullptr; ret.values = nullptr;
return ret; return ret;
} }
minEval DecisionTree::incrementalMinGiniDense(double** data, long* result, long size, long col, long*** count, double** record, long* max, long newSize, long forgetSize, bool isRoot){ minEval DecisionTree::incrementalMinGiniDense(double** data, long* result, long size, long col, long*** count, double** record, long* max, long newSize, long forgetSize, double** forgottenData, long* forgottenClass){
// newSize is before forget // newSize is before forget
long low = 0; long low = 0;
if(isRoot)size=newSize-forgetSize; //if(isRoot)
long i, j, k; long i, j, k, tmp;
long newMax = 0; long newMax = 0;
long maxLocal = max[col]; long maxLocal = max[col];
long **newCount=(long**)malloc(size*sizeof(long*)); long **newCount=(long**)malloc(size*sizeof(long*));
for(i=0;i<size;i++){
newCount[i] = (long*)malloc((classes+1)*sizeof(long));
for(j=0;j<= classes;j++)newCount[i][j]=0;
}
double newRecord[size]; double newRecord[size];
bool find; bool find;
long tmp3 = newSize-size;
long tmp4 = forgetSize;
// find total count for each class // find total count for each class
long T[classes]; long T[classes];
for(i=0;i<classes;i++)T[i]=0; long tmp2=0;
long* allForget = new long[classes];
for(i=0;i<classes;i++){
T[i]=0;
allForget[i]=0;
}
// forget
for(i=0;i<max[col];i++){ for(i=0;i<max[col];i++){
for(j=0;j<classes;j++){ for(j=0;j<classes;j++){
if(isRoot)count[col][i][j]=0; tmp = count[col][i][j];
else if(T[j]<count[col][i][j])T[j]=count[col][i][j]; tmp2+=tmp;
for(k=0; k<tmp; k++){
if(allForget[j]==forgottenClass[j])break;
if(record[col][i]==forgottenData[j][allForget[j]]){
forgetSize--;
count[col][i][j]--;
count[col][i][classes]--;
allForget[j]++;
}else{
break;
}
}
T[j]+=count[col][i][j];
} }
} }
@ -274,9 +344,6 @@ minEval DecisionTree::incrementalMinGiniDense(double** data, long* result, long
count[col][j][result[i]]++; count[col][j][result[i]]++;
count[col][j][classes] ++; count[col][j][classes] ++;
find = true; find = true;
}else if(data[i][col]<record[col][j]){
count[col][j][result[i]]++;
count[col][j][classes] ++;
} }
} }
for(j=0;j<newMax;j++){ for(j=0;j<newMax;j++){
@ -284,65 +351,64 @@ minEval DecisionTree::incrementalMinGiniDense(double** data, long* result, long
newCount[j][result[i]]++; newCount[j][result[i]]++;
newCount[j][classes] ++; newCount[j][classes] ++;
find = true; find = true;
} else if(data[i][col]<newRecord[j]){
newCount[j][result[i]]++;
newCount[j][classes] ++;
} }
} }
if(not find){ if(not find){
newRecord[newMax] = data[i][col]; newCount[newMax] = (long*)malloc((classes+1)*sizeof(long));
double currentMinMax = -1*DBL_MAX; for(j=0;j<= classes;j++)newCount[newMax][j]=0;
for(j=0;j<max[col];j++){
if(record[col][j]<newRecord[newMax] and record[col][j]>currentMinMax){
currentMinMax = record[col][j];
for(k=0;k<=classes;k++)newCount[newMax][k]=count[col][j][k];
}
}
for(j=0;j<newMax;j++){
if(newRecord[j]<newRecord[newMax] and currentMinMax<newRecord[j]){
currentMinMax = newRecord[j];
for(k=0;k<=classes;k++)newCount[newMax][k]=newCount[j][k];
}
}
if(currentMinMax== -1*DBL_MAX){
for(k=0;k<=classes;k++)newCount[newMax][k]=0;
}
newCount[newMax][result[i]]++; newCount[newMax][result[i]]++;
newCount[newMax][classes]++; newCount[newMax][classes]++;
newRecord[newMax] = data[i][col];
newMax++; newMax++;
} }
} }
// Updata new count and record // Updata new count and record
long* d;
if(newMax>0){ if(newMax>0){
d = (long*)malloc(sizeof(long)*newMax);
for(i=0;i<newMax;i++)d[i]=i;
std::sort(d, d+newMax, [&newRecord](long l, long r){return newRecord[l]<newRecord[r];});
max[col]+=newMax; max[col]+=newMax;
long** updateCount = (long**)malloc(max[col]*sizeof(long*)); long** updateCount = (long**)malloc(max[col]*sizeof(long*));
double* updateRecord = (double*)malloc(max[col]*sizeof(double)); double* updateRecord = (double*)malloc(max[col]*sizeof(double));
j = 0;
k = 0;
for(i=0; i<max[col]; i++){ for(i=0; i<max[col]; i++){
if(i>=newMax){ if(k==max[col]-newMax){
updateCount[i] = count[col][i-newMax]; updateCount[i] = newCount[j];
updateRecord[i] = record[col][i-newMax]; updateRecord[i] = newRecord[j];
j++;
}
else if(j==newMax){
updateCount[i] = count[col][k];
updateRecord[i] = record[col][k];
k++;
}
else if(newRecord[j]>record[col][k]){
updateCount[i] = count[col][k];
updateRecord[i] = record[col][k];
k++;
} }
else{ else{
updateCount[i] = newCount[i]; updateCount[i] = newCount[j];
updateRecord[i] = newRecord[i]; updateRecord[i] = newRecord[j];
j++;
} }
} }
free(count[col]); free(count[col]);
free(record[col]); free(record[col]);
count[col]=updateCount; count[col]=updateCount;
record[col]=updateRecord; record[col]=updateRecord;
} free(d);
for(i=newMax; i<size; i++){
free(newCount[i]);
} }
free(newCount); free(newCount);
//calculate gini //calculate gini
minEval ret; minEval ret;
if(evalue==Evaluation::gini){ if(evalue==Evaluation::gini){
ret = giniDenseIncremental(max[col], record[col], count[col], classes, newSize, T); ret = giniDense(max[col], newSize, classes, count[col], d, record[col], T);
}else if(evalue==Evaluation::entropy or evalue==Evaluation::logLoss){ }else if(evalue==Evaluation::entropy or evalue==Evaluation::logLoss){
ret = entropyDenseIncremental(max[col], record[col], count[col], classes, newSize, T); ret = entropyDense(max[col], newSize, classes, count[col], d, record[col], T);
} }
ret.values = nullptr; ret.values = nullptr;
return ret; return ret;
@ -353,7 +419,6 @@ minEval DecisionTree::findMinGiniSparse(double** data, long* result, long* total
long* d = (long*)malloc(size*sizeof(long)); long* d = (long*)malloc(size*sizeof(long));
for(i=0; i<size; i++)d[i]=i; for(i=0; i<size; i++)d[i]=i;
std::sort(d, d+size, [&data, col](long l, long r){return data[l][col]<data[r][col];}); std::sort(d, d+size, [&data, col](long l, long r){return data[l][col]<data[r][col];});
minEval ret; minEval ret;
if(evalue == Evaluation::gini){ if(evalue == Evaluation::gini){
ret = giniSparse(data, result, d, size, col, classes, totalT); ret = giniSparse(data, result, d, size, col, classes, totalT);
@ -378,7 +443,6 @@ minEval DecisionTree::findMinGiniDense(double** data, long* result, long* totalT
long low = 0; long low = 0;
long i, j, k, max=0; long i, j, k, max=0;
long** count = (long**)malloc(size*sizeof(long*)); long** count = (long**)malloc(size*sizeof(long*));
// size2 and count2 are after forget
double* record = (double*)malloc(size*sizeof(double)); double* record = (double*)malloc(size*sizeof(double));
bool find; bool find;
for(i=0;i<size;i++){ for(i=0;i<size;i++){
@ -402,20 +466,24 @@ minEval DecisionTree::findMinGiniDense(double** data, long* result, long* totalT
max++; max++;
} }
} }
long d[max];
for(i=0;i<max;i++){
d[i] = i;
}
std::sort(d, d+max, [&record](long l, long r){return record[l]<record[r];});
long** rem = (long**)malloc(max*sizeof(long*)); long** rem = (long**)malloc(max*sizeof(long*));
double* record2 = (double*)malloc(max*sizeof(double)); double* record2 = (double*)malloc(max*sizeof(double));
for(i=0;i<max;i++){ for(i=0;i<max;i++){
rem[i] = count[i]; rem[i] = count[d[i]];
record2[i] = record[i]; record2[i] = record[d[i]];
} }
free(count); free(count);
free(record); free(record);
long d[max]; for(i=0;i<max;i++){
for(i=0;i<max;i++){
d[i] = i; d[i] = i;
} }
std::sort(d, d+max, [&record2](long l, long r){return record2[l]<record2[r];});
minEval ret; minEval ret;
if(evalue == Evaluation::gini){ if(evalue == Evaluation::gini){
ret = giniDense(max, size, classes, rem, d, record2, totalT); ret = giniDense(max, size, classes, rem, d, record2, totalT);
@ -429,23 +497,126 @@ minEval DecisionTree::findMinGiniDense(double** data, long* result, long* totalT
return ret; return ret;
} }
double xxx;
void DecisionTree::fit(double** data, long* result, long size){ void DecisionTree::fit(double** data, long* result, long size){
roundNo++; double isUp = -1.0;
long localT = 0;
long localAll = 0;
if(DTree->size==0){ if(DTree->size==0){
retain = size;
maxHeight = (long)log2((double)retain);
maxHeight = std::max(maxHeight, (long)1);
Update(data, result, size, DTree); Update(data, result, size, DTree);
}else{ }else{
if(forgetRate<=0){
for(long j=0; j<size; j++){
if(Test(data[j], DTree)==result[j])localT++;
localAll++;
}
double guessAcc;
guessAcc = 1.0/classes;
if(forgetRate==0.0){
double lastSm = (double)lastT/lastAll;
double localSm = (double)localT/localAll;
/*long guesses[classes], i;
for(i=0; i<classes; i++)guesses[i]=0;
for(i=0; i<DTree->size; i++){
guesses[DTree->resultRecord[i]]++;
}
for(i=0; i<size; i++){
guessAcc += (double)guesses[result[i]]/DTree->size/size;
}*/
if(localSm <= guessAcc){
//if(localSm <= 1.0/classes){
lastT = localT;
lastAll = localAll;
retain = size;
//increaseRate = 1.0-localSm;
}
else if(lastSm <= guessAcc){
//else if(lastSm <= 1.0/classes){
lastT = localT;
lastAll = localAll;
//forgetRate=-5.0;
retain += size;
//increaseRate -= localSm;
//increaseRate = initialIR;
//increaseRate -= localSm;
//increaseRate /= (double)localSm-1.0/classes;
}
else if(lastSm == localSm){
lastT += localT;
lastAll += localAll;
retain+=(long)round(increaseRate*size);
//increaseRate*=increaseRate;
//retain = (long)((double)retain*isUp+0.25*size);
}
else{
/*double lastSd = sqrt(pow((1.0-lastSm),2)*lastT+pow(lastSm,2)*(lastAll-lastT)/(lastAll-1));
double localSd = sqrt(pow((1.0-localSm),2)*localT+pow(localSm,2)*(localAll-localT)/(localAll-1));
double v = lastAll+localAll-2;
double sp = sqrt(((lastAll-1) * lastSd * lastSd + (localAll-1) * localSd * localSd) / v);
double q;
//double t=lastSm-localSm;
if(sp==0)q=1.0;
else if(lastAll+lastAll<2000){
q = abs(lastSm-localSm);
}
else{
double t = t/(sp*sqrt(1.0/lastAll+1.0/localAll));
boost::math::students_t dist(v);
double c = cdf(dist, t);
q = cdf(complement(dist, fabs(t)));
}*/
isUp = ((double)localSm-guessAcc)/((double)lastSm-guessAcc);
//isUp = ((double)localSm-1.0/classes)/((double)lastSm-1.0/classes);
increaseRate = increaseRate/isUp;
//increaseRate += increaseRate*factor;
if(isUp>=1.0)isUp=pow(isUp, 2);
else{
isUp=pow(isUp, 3-isUp);
}
retain = std::min((long)round(retain*isUp+increaseRate*size), retain+size);
//double factor = ((lastSm-localSm)/localSm)*abs((lastSm-localSm)/localSm)*increaseRate;
//retain += std::min((long)round(factor*retain+increaseRate*size), size);
lastT = localT;
lastAll = localAll;
}
//printf(" %f, %f, %f\n", increaseRate, localSm, lastSm);
}else{
long i;
retain = DTree->size+size;
/*double guessAcc=0.0;
long guesses[classes];
for(i=0; i<classes; i++)guesses[i]=0;
for(i=0; i<DTree->size; i++){
guesses[DTree->resultRecord[i]]++;
}
for(i=0; i<size; i++){
guessAcc += (double)guesses[result[i]]/DTree->size/size;
}*/
while(retain>=roundNo){
if((double)localT/localAll>guessAcc){
forgetRate+=5.0;
}
roundNo*=2;
}
if((double)localT/localAll<=guessAcc){
forgetRate=-10.0;
}
if(forgetRate>=0){
forgetRate=0.0;
}
lastT = localT;
lastAll = localAll;
}
}
//if(increaseRate>initialIR)increaseRate=initialIR;
//printf("%f\n", increaseRate);
if(retain<size)retain=size;
maxHeight = (long)log2((double)retain);
maxHeight = std::max(maxHeight, (long)1);
IncrementalUpdate(data, result, size, DTree); IncrementalUpdate(data, result, size, DTree);
} }
/*
if(Rebuild and called==10){
called = 0;
Rebuild = false;
}else if(Rebuild){
called = 11;
}else{
called++;
}*/
} }
long* DecisionTree::fitThenPredict(double** trainData, long* trainResult, long trainSize, double** testData, long testSize){ long* DecisionTree::fitThenPredict(double** trainData, long* trainResult, long trainSize, double** testData, long testSize){
@ -461,12 +632,28 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT*
long i, j; long i, j;
long low = 0; long low = 0;
long forgetSize=0; long forgetSize=0;
if(retain>0 and current->size+size>retain) forgetSize = std::min(current->size+size - retain, current->size); long* index;
else if(retain<0) forgetSize = (long)current->size*forgetRate; bool forgetOld = false;
long* index = new long[current->size]; index = (long*)malloc(sizeof(long)*current->size);
if(current->size+size>retain and current->height==0) {
forgetSize = std::min(current->size+size - retain, current->size);
}
if(forgetSize==current->size){
Update(data, result, size, current);
return;
}
double** dataNew; double** dataNew;
long* resultNew; long* resultNew;
double*** forgottenData = (double***)malloc(feature*sizeof(double**));
long* forgottenClass = (long*)malloc(classes*sizeof(long));
for(i=0;i<classes;i++)forgottenClass[i]=0;
if(current->height == 0){ if(current->height == 0){
for(i=0; i<feature; i++){
forgottenData[i] = (double**)malloc(classes*sizeof(double*));
for(j=0; j<classes; j++){
forgottenData[i][j] = (double*)malloc(forgetSize*sizeof(double));
}
}
dataNew = (double**)malloc((size+current->size-forgetSize)*sizeof(double*)); dataNew = (double**)malloc((size+current->size-forgetSize)*sizeof(double*));
resultNew = (long*)malloc((size+current->size-forgetSize)*sizeof(long)); resultNew = (long*)malloc((size+current->size-forgetSize)*sizeof(long));
for(i=0;i<size;i++){ for(i=0;i<size;i++){
@ -476,47 +663,110 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT*
for(i=0; i<current->size; i++){ for(i=0; i<current->size; i++){
index[i] = i; index[i] = i;
} }
std::shuffle(index, index+current->size, g); if(isRF)std::shuffle(index, index+current->size, g);
long x = 0; long x = 0;
for(i=0;i<current->size;i++){ for(i=0;i<current->size;i++){
if(i>=current->size-forgetSize){ if(i>=current->size-forgetSize){
current->dataRecord[index[i]][feature-1] = DBL_MAX; for(j=0; j<feature; j++){
forgottenData[j][current->resultRecord[index[i]]][forgottenClass[current->resultRecord[index[i]]]]=current->dataRecord[index[i]][j];
}
forgottenClass[current->resultRecord[index[i]]]++;
current->dataRecord[index[i]][feature] = DBL_MAX;
}else{ }else{
dataNew[i+size] = current->dataRecord[index[i]]; dataNew[i+size] = current->dataRecord[index[i]];
resultNew[i+size] = current->resultRecord[index[i]]; resultNew[i+size] = current->resultRecord[index[i]];
} }
} }
for(i=0; i<feature; i++){
for(j=0; j<classes; j++){
std::sort(forgottenData[i][j], forgottenData[i][j]+forgottenClass[j]);
}
}
}else{ }else{
forgetSize = 0; forgetSize = 0;
dataNew = (double**)malloc((size+current->size)*sizeof(double*)); dataNew = (double**)malloc((size+current->size)*sizeof(double*));
resultNew = (long*)malloc((size+current->size)*sizeof(long)); resultNew = (long*)malloc((size+current->size)*sizeof(long));
long xxx[current->size];
for(i=0;i<size;i++){ for(i=0;i<size;i++){
dataNew[i] = data[i]; dataNew[i] = data[i];
resultNew[i] = result[i]; resultNew[i] = result[i];
} }
for(i=0;i<current->size;i++){ for(i=0;i<current->size;i++){
if(current->dataRecord[i][feature-1] == DBL_MAX){ if(current->dataRecord[i][feature] == DBL_MAX){
forgetSize++; xxx[forgetSize]=i;
continue; forgetSize++;
forgottenClass[current->resultRecord[i]]++;
}else{ }else{
dataNew[i+size-forgetSize] = current->dataRecord[i]; dataNew[i+size-forgetSize] = current->dataRecord[i];
resultNew[i+size-forgetSize] = current->resultRecord[i]; resultNew[i+size-forgetSize] = current->resultRecord[i];
} }
} }
if(forgetSize==current->size){
free(forgottenData);
free(forgottenClass);
if(size!=0){
free(dataNew);
free(resultNew);
Update(data, result, size, current);
}else{
// if a node have no new data and forget all old data, just keep old data
return;
}
return;
}
for(i=0; i<feature; i++){
forgottenData[i] = (double**)malloc(classes*sizeof(double*));
for(j=0; j<classes; j++){
forgottenData[i][j] = (double*)malloc(std::max(forgottenClass[j], (long)1)*sizeof(double));
}
}
long* k = (long*)malloc(sizeof(long)*classes);
for(i=0; i<classes; i++)k[i]=0;
for(i=0;i<forgetSize;i++){
long tmp = xxx[i];
for(j=0; j<feature; j++){
forgottenData[j][current->resultRecord[tmp]][k[current->resultRecord[tmp]]]=current->dataRecord[tmp][j];
}
k[current->resultRecord[tmp]]++;
}
free(k);
for(i=0; i<feature; i++){
for(j=0; j<classes; j++){
std::sort(forgottenData[i][j], forgottenData[i][j]+forgottenClass[j]);
}
}
} }
free(data); free(data);
free(result); free(result);
current->size -= forgetSize; current->size -= forgetSize;
current->size += size; current->size += size;
// end condition // end condition
if(current->terminate or roundNo%Rebuild==0){ if(current->terminate or current->height==maxHeight or current->size==1){
for(i=0;i<feature;i++){
for(j=0; j<classes; j++){
free(forgottenData[i][j]);
}
free(forgottenData[i]);
}
free(forgottenData);
free(forgottenClass);
if(current->height == 0){ if(current->height == 0){
for(i=0; i<forgetSize; i++){ for(i=0; i<forgetSize; i++){
free(current->dataRecord[index[current->size-size+i]]); free(current->dataRecord[index[current->size-size+i]]);
} }
} }
delete(index); free(index);
Update(dataNew, resultNew, current->size, current);
return;
}else if(size==0){
for(i=0;i<feature;i++){
for(j=0; j<classes; j++){
free(forgottenData[i][j]);
}
free(forgottenData[i]);
}
free(forgottenData);
free(forgottenClass);
Update(dataNew, resultNew, current->size, current); Update(dataNew, resultNew, current->size, current);
return; return;
} }
@ -525,24 +775,47 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT*
long cFeature; long cFeature;
cMin.eval = DBL_MAX; cMin.eval = DBL_MAX;
cMin.values = nullptr; cMin.values = nullptr;
// TODO long T[classes];
double HY=0;
for(i=0;i<classes;i++){
T[i] = 0;
}
for(i=0;i<size;i++){
j = resultNew[i];
T[j]++;
}
for(i=0;i<classes;i++){
if(evalue == Evaluation::entropy){
if(T[i]!=0)HY -= ((double)T[i]/size)*log2((double)T[i]/size);
}else{
HY += pow(((double)T[i]/size), 2);
}
}
for(i=0;i<maxFeature; i++){ for(i=0;i<maxFeature; i++){
if(Sparse[current->featureId[i]]==1){ long col = DTree->featureId[i];
c = incrementalMinGiniSparse(dataNew, resultNew, current->size+forgetSize, size, current, current->featureId[i], forgetSize, false); if(Sparse[col]==1){
c = incrementalMinGiniSparse(dataNew, resultNew, current->size, size, current, col, forgetSize, forgottenData[col], forgottenClass);
} }
else if(Sparse[current->featureId[i]]==0){ else if(Sparse[col]==0){
c = incrementalMinGiniDense(dataNew, resultNew, size, current->featureId[i], current->count, current->record, current->max, current->size+forgetSize, forgetSize, false); c = incrementalMinGiniDense(dataNew, resultNew, size, col, current->count, current->record, current->max, current->size, forgetSize, forgottenData[col], forgottenClass);
}else{ }else{
//c = incrementalMinGiniCategorical(); //c = incrementalMinGiniCategorical();
} }
if(c.eval<cMin.eval){ if(c.eval<cMin.eval){
cMin.eval = c.eval; cMin.eval = c.eval;
cMin.value = c.value; cMin.value = c.value;
if(cMin.values != nullptr)free(cMin.values);
cMin.values = c.values; cMin.values = c.values;
cFeature = current->featureId[i]; cFeature = col;
}else if(c.values!=nullptr)free(c.values); }
}
for(i=0;i<feature;i++){
for(j=0; j<classes; j++){
free(forgottenData[i][j]);
}
free(forgottenData[i]);
} }
free(forgottenData);
free(forgottenClass);
if(cMin.eval==DBL_MAX){ if(cMin.eval==DBL_MAX){
current->terminate = true; current->terminate = true;
long t[classes]; long t[classes];
@ -550,27 +823,23 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT*
t[i]=0; t[i]=0;
} }
for(i=low;i<low+size;i++){ for(i=low;i<low+size;i++){
t[result[i]]++; t[resultNew[i]]++;
} }
if(cMin.values!=nullptr)free(cMin.values); if(cMin.values!=nullptr)free(cMin.values);
current->result = std::distance(t, std::max_element(t, t+classes)); current->result = std::distance(t, std::max_element(t, t+classes));
free(index);
free(current->dataRecord);
free(current->resultRecord);
current->dataRecord = dataNew;
current->resultRecord = resultNew;
return; return;
} }
//diverse data //diverse data
long ptL=0, ptR=0; long ptL=0, ptR=0;
double* t; double* t;
long currentSize = current->size; long currentSize = current->size;
//TODO:Discrete
// Same diverse point as last time // Same diverse point as last time
if(current->dpoint==cMin.value and current->feature==cFeature){ if(current->dpoint==cMin.value and current->feature==cFeature){
long xxx = current->left->size;
/*for(i=0; i<size; i++){
if(dataNew[i][current->feature]<=current->dpoint){
ptL++;
}else{
ptR++;
}
}*/
ptL = size; ptL = size;
ptR = size; ptR = size;
long* resultL = (long*)malloc((ptL)*sizeof(long)); long* resultL = (long*)malloc((ptL)*sizeof(long));
@ -598,7 +867,7 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT*
free(current->dataRecord[index[current->size-size+i]]); free(current->dataRecord[index[current->size-size+i]]);
} }
} }
delete(index); free(index);
free(current->dataRecord); free(current->dataRecord);
free(current->resultRecord); free(current->resultRecord);
current->dataRecord = dataNew; current->dataRecord = dataNew;
@ -636,21 +905,23 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT*
Update(dataL, resultL, ptL, current->left); Update(dataL, resultL, ptL, current->left);
Update(dataR, resultR, ptR, current->right); Update(dataR, resultR, ptR, current->right);
// TODO: free memeory
if(current->height == 0){ if(current->height == 0){
for(i=0; i<forgetSize; i++){ for(i=0; i<forgetSize; i++){
free(current->dataRecord[index[current->size-size+i]]); free(current->dataRecord[index[current->size-size+i]]);
} }
} }
free(index);
delete(index);
free(current->dataRecord); free(current->dataRecord);
free(current->resultRecord); free(current->resultRecord);
current->dataRecord = dataNew; current->dataRecord = dataNew;
current->resultRecord = resultNew; current->resultRecord = resultNew;
} }
void DecisionTree::Update(double** data, long* result, long size, DT* current){ void DecisionTree::Update(double** data, long* result, long size, DT* current){
if(not current->created)createNode(current, current->height, feature, classes);
long low = 0; long low = 0;
long i, j; long i, j;
double HY = 0;
// end condition // end condition
if(current->dataRecord!=nullptr)free(current->dataRecord); if(current->dataRecord!=nullptr)free(current->dataRecord);
current->dataRecord = data; current->dataRecord = data;
@ -663,7 +934,7 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){
for(i=0;i<classes;i++){ for(i=0;i<classes;i++){
t[i]=0; t[i]=0;
} }
for(i=low;i<low+size;i++){ for(i=0;i<size;i++){
t[result[i]]++; t[result[i]]++;
} }
current->result = std::distance(t, std::max_element(t, t+classes)); current->result = std::distance(t, std::max_element(t, t+classes));
@ -683,19 +954,25 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){
current->result = i; current->result = i;
return; return;
} }
if(evalue == Evaluation::entropy){
if(T[i]!=0)HY -= ((double)T[i]/size)*log2((double)T[i]/size);
}else{
HY += pow(((double)T[i]/size), 2);
}
} }
// find min Evaluation // find min Evaluation
minEval c, cMin; minEval c, cMin;
long cFeature, oldMax, col, left=0; long cFeature, oldMax, col, left=0;
cMin.eval = DBL_MAX; cMin.eval = DBL_MAX;
cMin.values = nullptr; cMin.values = nullptr;
//TODO cFeature = -1;
//TODO: categorical
for(i=0;i<maxFeature; i++){ for(i=0;i<maxFeature; i++){
col = current->featureId[i]; col = DTree->featureId[i];
if(Sparse[current->featureId[i]]==1){ if(Sparse[col]==1){
c = findMinGiniSparse(data, result, T, size, col, current); c = findMinGiniSparse(data, result, T, size, col, current);
} }
else if(Sparse[current->featureId[i]]==0){ else if(Sparse[col]==0){
c = findMinGiniDense(data, result, T, size, col); c = findMinGiniDense(data, result, T, size, col);
if(current->count[col]!=nullptr){ if(current->count[col]!=nullptr){
for(j=0; j<current->max[col]; j++){ for(j=0; j<current->max[col]; j++){
@ -715,32 +992,37 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){
if(cMin.values!=nullptr)free(cMin.values); if(cMin.values!=nullptr)free(cMin.values);
cMin.values = c.values; cMin.values = c.values;
cMin.value = c.value; cMin.value = c.value;
cFeature = current->featureId[i]; cFeature = col;
left = c.left; left = c.left;
}else if(c.values!=nullptr){ }else if(c.values!=nullptr){
free(c.values); free(c.values);
} }
} }
if(cMin.eval == DBL_MAX){ if(cMin.eval == DBL_MAX){
current->terminate = true; current->terminate = true;
long max = 0; long max = 0;
long maxs[classes];
long count = 0;
for(i=1;i<classes;i++){ for(i=1;i<classes;i++){
if(T[max]<T[i])max=i; if(T[max]<T[i]){
max=i;
}
} }
if(cMin.values!=nullptr)free(cMin.values);
current->result = max; current->result = max;
return; return;
} }
//printf(" %f\n", HY-cMin.eval);
//diverse data //diverse data
current->terminate = false; current->terminate = false;
current->feature = cFeature; current->feature = cFeature;
current->dpoint = cMin.value; current->dpoint = cMin.value;
long ptL=0, ptR=0; long ptL=0, ptR=0;
//TODO:Discrete //TODO: categorical
long* resultL = new long[left]; long* resultL = new long[size];
long* resultR = new long[size-left]; long* resultR = new long[size];
double** dataL = new double*[left]; double** dataL = new double*[size];
double** dataR = new double*[size-left]; double** dataR = new double*[size];
for(i=low; i<low+size; i++){ for(i=low; i<low+size; i++){
if(data[i][current->feature]<=current->dpoint){ if(data[i][current->feature]<=current->dpoint){
dataL[ptL] = data[i]; dataL[ptL] = data[i];
@ -757,14 +1039,12 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){
} }
long DecisionTree::Test(double* data, DT* root){ long DecisionTree::Test(double* data, DT* root){
if(root->terminate)return root->result; if(root->terminate or root->height == maxHeight)return root->result;
if(data[root->feature]<=root->dpoint)return Test(data, root->left); if(data[root->feature]<=root->dpoint)return Test(data, root->left);
return Test(data, root->right); return Test(data, root->right);
} }
void DecisionTree::print(DT* root){ void DecisionTree::print(DT* root){
int x;
//std::cin>>x;
if(root->terminate){ if(root->terminate){
printf("%ld", root->result); printf("%ld", root->result);
return; return;

@ -1,63 +1,70 @@
#include "DecisionTree.h" #include "DecisionTree.h"
#include "aquery.h" #include "RF.h"
// __AQ_NO_SESSION__ // __AQ_NO_SESSION__
#include "../server/table.h" #include "../server/table.h"
#include "aquery.h"
DecisionTree* dt = nullptr; DecisionTree *dt = nullptr;
RandomForest *rf = nullptr;
__AQEXPORT__(bool) newtree(int height, long f, ColRef<int> sparse, double forget, long maxf, long noclasses, Evaluation e, long r, long rb){
if(sparse.size!=f)return 0; __AQEXPORT__(bool)
int* issparse = (int*)malloc(f*sizeof(int)); newtree(int height, long f, ColRef<int> X, double forget, long maxf, long noclasses, Evaluation e, long r, long rb)
for(long i=0; i<f; i++){ {
issparse[i] = sparse.container[i]; if (X.size != f)
} return false;
if(maxf<0)maxf=f; int *X_cpy = (int *)malloc(f * sizeof(int));
dt = new DecisionTree(height, f, issparse, forget, maxf, noclasses, e, r, rb);
return 1; memcpy(X_cpy, X.container, f);
}
__AQEXPORT__(bool) fit(ColRef<ColRef<double>> X, ColRef<int> y){ if (maxf < 0)
if(X.size != y.size)return 0; maxf = f;
double** data = (double**)malloc(X.size*sizeof(double*)); dt = new DecisionTree(f, X_cpy, forget, maxf, noclasses, e);
long* result = (long*)malloc(y.size*sizeof(long)); rf = new RandomForest(height, f, X_cpy, forget, noclasses, e)
for(long i=0; i<X.size; i++){ return true;
data[i] = X.container[i].container;
result[i] = y.container[i];
}
data[pt] = (double*)malloc(X.size*sizeof(double));
for(j=0; j<X.size; j++){
data[pt][j]=X.container[j];
}
result[pt] = y;
pt ++;
return 1;
} }
__AQEXPORT__(bool) fit(vector_type<vector_type<double>> v, vector_type<long> res){
double** data = (double**)malloc(v.size*sizeof(double*)); // size_t pt = 0;
for(int i = 0; i < v.size; ++i) // __AQEXPORT__(bool) fit(ColRef<ColRef<double>> X, ColRef<int> y){
// if(X.size != y.size)return 0;
// double** data = (double**)malloc(X.size*sizeof(double*));
// long* result = (long*)malloc(y.size*sizeof(long));
// for(long i=0; i<X.size; i++){
// data[i] = X.container[i].container;
// result[i] = y.container[i];
// }
// data[pt] = (double*)malloc(X.size*sizeof(double));
// for(uint32_t j=0; j<X.size; j++){
// data[pt][j]=X.container[j];
// }
// result[pt] = y;
// pt ++;
// return 1;
// }
__AQEXPORT__(bool)
fit(vector_type<vector_type<double>> v, vector_type<long> res)
{
double **data = (double **)malloc(v.size * sizeof(double *));
for (int i = 0; i < v.size; ++i)
data[i] = v.container[i].container; data[i] = v.container[i].container;
dt->fit(data, res.container, v.size); // dt->fit(data, res.container, v.size);
rf->fit(data, res.container, v.size);
return true; return true;
} }
__AQEXPORT__(vectortype_cstorage) predict(vector_type<vector_type<double>> v){ __AQEXPORT__(vectortype_cstorage)
int* result = (int*)malloc(v.size*sizeof(int)); predict(vector_type<vector_type<double>> v)
{
int *result = (int *)malloc(v.size * sizeof(int));
for(long i=0; i<v.size; i++){ for (long i = 0; i < v.size; i++)
result[i]=dt->Test(v.container[i].container, dt->DTree); //result[i] = dt->Test(v.container[i].container, dt->DTree);
//printf("%d ", result[i]); result[i] = rf->Test(v.container, rf->DTrees);
} auto container = (vector_type<int> *)malloc(sizeof(vector_type<int>));
auto container = (vector_type<int>*)malloc(sizeof(vector_type<int>));
container->size = v.size; container->size = v.size;
container->capacity = 0; container->capacity = 0;
container->container = result; container->container = result;
// container->out(10);
// ColRef<vector_type<int>>* col = (ColRef<vector_type<int>>*)malloc(sizeof(ColRef<vector_type<int>>));
auto ret = vectortype_cstorage{.container = container, .size = 1, .capacity = 0}; auto ret = vectortype_cstorage{.container = container, .size = 1, .capacity = 0};
// col->initfrom(ret, "sibal");
// print(*col);
return ret; return ret;
//return true;
} }

@ -31,12 +31,14 @@ void print<__int128_t>(const __int128_t& v, const char* delimiter){
s[40] = 0; s[40] = 0;
std::cout<< get_int128str(v, s+40)<< delimiter; std::cout<< get_int128str(v, s+40)<< delimiter;
} }
template <> template <>
void print<__uint128_t>(const __uint128_t&v, const char* delimiter){ void print<__uint128_t>(const __uint128_t&v, const char* delimiter){
char s[41]; char s[41];
s[40] = 0; s[40] = 0;
std::cout<< get_uint128str(v, s+40) << delimiter; std::cout<< get_uint128str(v, s+40) << delimiter;
} }
std::ostream& operator<<(std::ostream& os, __int128 & v) std::ostream& operator<<(std::ostream& os, __int128 & v)
{ {
print(v); print(v);
@ -76,6 +78,7 @@ char* intToString(T val, char* buf){
return buf; return buf;
} }
void skip(const char*& buf){ void skip(const char*& buf){
while(*buf && (*buf >'9' || *buf < '0')) buf++; while(*buf && (*buf >'9' || *buf < '0')) buf++;
} }
@ -264,8 +267,8 @@ std::string base62uuid(int l) {
static uniform_int_distribution<uint64_t> u(0x10000, 0xfffff); static uniform_int_distribution<uint64_t> u(0x10000, 0xfffff);
uint64_t uuid = (u(engine) << 32ull) + uint64_t uuid = (u(engine) << 32ull) +
(std::chrono::system_clock::now().time_since_epoch().count() & 0xffffffff); (std::chrono::system_clock::now().time_since_epoch().count() & 0xffffffff);
//printf("%llu\n", uuid);
string ret; string ret;
while (uuid && l-- >= 0) { while (uuid && l-- >= 0) {
ret = string("") + base62alp[uuid % 62] + ret; ret = string("") + base62alp[uuid % 62] + ret;
uuid /= 62; uuid /= 62;
@ -278,15 +281,15 @@ inline const char* str(const bool& v) {
return v ? "true" : "false"; return v ? "true" : "false";
} }
class A{ class A {
public: public:
std::chrono::high_resolution_clock::time_point tp; std::chrono::high_resolution_clock::time_point tp;
A(){ A(){
tp = std::chrono::high_resolution_clock::now(); tp = std::chrono::high_resolution_clock::now();
printf("A %llx created.\n", tp.time_since_epoch().count()); printf("A %llu created.\n", tp.time_since_epoch().count());
} }
~A() { ~A() {
printf("A %llx died after %lldns.\n", tp.time_since_epoch().count(), printf("A %llu died after %lldns.\n", tp.time_since_epoch().count(),
(std::chrono::high_resolution_clock::now() - tp).count()); (std::chrono::high_resolution_clock::now() - tp).count());
} }
}; };
@ -525,8 +528,8 @@ void ScratchSpace::cleanup(){
static_cast<vector_type<void*>*>(temp_memory_fractions); static_cast<vector_type<void*>*>(temp_memory_fractions);
if (vec_tmpmem_fractions->size) { if (vec_tmpmem_fractions->size) {
for(auto& mem : *vec_tmpmem_fractions){ for(auto& mem : *vec_tmpmem_fractions){
free(mem); //free(mem);
//GC::gc_handle->reg(mem); GC::gc_handle->reg(mem);
} }
vec_tmpmem_fractions->clear(); vec_tmpmem_fractions->clear();
} }

@ -66,23 +66,24 @@ struct Session{
void* memory_map; void* memory_map;
}; };
struct StoredProcedure{ struct StoredProcedure {
uint32_t cnt, postproc_modules; uint32_t cnt, postproc_modules;
char **queries; char **queries;
const char* name; const char* name;
void **__rt_loaded_modules; void **__rt_loaded_modules;
}; };
struct Context{
struct Context {
typedef int (*printf_type) (const char *format, ...); typedef int (*printf_type) (const char *format, ...);
void* module_function_maps = 0; void* module_function_maps = nullptr;
Config* cfg; Config* cfg;
int n_buffers, *sz_bufs; int n_buffers, *sz_bufs;
void **buffers; void **buffers;
void* alt_server = 0; void* alt_server = nullptr;
Log_level log_level = LOG_INFO; Log_level log_level = LOG_INFO;
Session current; Session current;
@ -115,6 +116,12 @@ struct Context{
}; };
struct StoredProcedurePayload {
StoredProcedure *p;
Context* cxt;
};
int execTriggerPayload(void*);
#ifdef _WIN32 #ifdef _WIN32
#define __DLLEXPORT__ __declspec(dllexport) __stdcall #define __DLLEXPORT__ __declspec(dllexport) __stdcall
@ -169,4 +176,79 @@ inline void AQ_ZeroMemory(_This_Struct& __val) {
memset(&__val, 0, sizeof(_This_Struct)); memset(&__val, 0, sizeof(_This_Struct));
} }
#ifdef __USE_STD_SEMAPHORE__
#include <semaphore>
class A_Semaphore {
private:
std::binary_semaphore native_handle;
public:
A_Semaphore(bool v = false) {
native_handle = std::binary_semaphore(v);
}
void acquire() {
native_handle.acquire();
}
void release() {
native_handle.release();
}
~A_Semaphore() { }
};
#else
#ifdef _WIN32
class A_Semaphore {
private:
void* native_handle;
public:
A_Semaphore(bool);
void acquire();
void release();
~A_Semaphore();
};
#else
#ifdef __APPLE__
#include <dispatch/dispatch.h>
class A_Semaphore {
private:
dispatch_semaphore_t native_handle;
public:
explicit A_Semaphore(bool v = false) {
native_handle = dispatch_semaphore_create(v);
}
void acquire() {
// puts("acquire");
dispatch_semaphore_wait(native_handle, DISPATCH_TIME_FOREVER);
}
void release() {
// puts("release");
dispatch_semaphore_signal(native_handle);
}
~A_Semaphore() {
}
};
#else
#include <semaphore.h>
class A_Semaphore {
private:
sem_t native_handle;
public:
A_Semaphore(bool v = false) {
sem_init(&native_handle, v, 1);
}
void acquire() {
sem_wait(&native_handle);
}
void release() {
sem_post(&native_handle);
}
~A_Semaphore() {
sem_destroy(&native_handle);
}
};
#endif // __APPLE__
#endif // _WIN32
#endif //__USE_STD_SEMAPHORE__
void print_monetdb_results(void* _srv, const char* sep, const char* end, uint32_t limit);
#endif #endif

@ -6,17 +6,17 @@
struct Context; struct Context;
struct Server{ struct Server{
MYSQL *server = 0; MYSQL *server = nullptr;
Context *cxt = 0; Context *cxt = nullptr;
bool status = 0; bool status = false;
bool has_error = false; bool has_error = false;
char* query = 0; char* query = nullptr;
int type = 0; int type = 0;
void connect(Context* cxt, const char* host = "bill.local", void connect(Context* cxt, const char* host = "bill.local",
const char* user = "root", const char* passwd = "0508", const char* user = "root", const char* passwd = "0508",
const char* db_name = "db", const unsigned int port = 3306, const char* db_name = "db", const unsigned int port = 3306,
const char* unix_socket = 0, const unsigned long client_flag = 0 const char* unix_socket = nullptr, const unsigned long client_flag = 0
); );
void exec(const char* q); void exec(const char* q);
void close(); void close();

@ -5,11 +5,25 @@
#include <string> #include <string>
#include "monetdb_conn.h" #include "monetdb_conn.h"
#include "monetdbe.h" #include "monetdbe.h"
#include "table.h" #include "table.h"
#include <thread> #include <thread>
#undef ERROR #ifdef _WIN32
#undef static_assert #include "winhelper.h"
#else
#include <dlfcn.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <atomic>
#endif // _WIN32
#ifdef ERROR
#undef ERROR
#endif
#ifdef static_assert
#undef static_assert
#endif
constexpr const char* monetdbe_type_str[] = { constexpr const char* monetdbe_type_str[] = {
"monetdbe_bool", "monetdbe_int8_t", "monetdbe_int16_t", "monetdbe_int32_t", "monetdbe_int64_t", "monetdbe_bool", "monetdbe_int8_t", "monetdbe_int16_t", "monetdbe_int32_t", "monetdbe_int64_t",
@ -76,7 +90,7 @@ void Server::connect(Context *cxt){
char c[50]; char c[50];
std::cin.getline(c, 49); std::cin.getline(c, 49);
for(int i = 0; i < 50; ++i) { for(int i = 0; i < 50; ++i) {
if (!c[i] || c[i] == 'y' || c[i] == 'Y'){ if (!c[i] || c[i] == 'y' || c[i] == 'Y') {
monetdbe_close(*server); monetdbe_close(*server);
free(*server); free(*server);
this->server = nullptr; this->server = nullptr;
@ -110,7 +124,7 @@ void Server::exec(const char* q){
auto _res = static_cast<monetdbe_result*>(this->res); auto _res = static_cast<monetdbe_result*>(this->res);
monetdbe_cnt _cnt = 0; monetdbe_cnt _cnt = 0;
auto qresult = monetdbe_query(*server, const_cast<char*>(q), &_res, &_cnt); auto qresult = monetdbe_query(*server, const_cast<char*>(q), &_res, &_cnt);
if (_res != 0){ if (_res != nullptr){
this->cnt = _res->nrows; this->cnt = _res->nrows;
this->res = _res; this->res = _res;
} }
@ -152,6 +166,8 @@ void Server::print_results(const char* sep, const char* end){
col_data[i] = static_cast<char *>(cols[i]->data); col_data[i] = static_cast<char *>(cols[i]->data);
szs [i] = monetdbe_type_szs[cols[i]->type]; szs [i] = monetdbe_type_szs[cols[i]->type];
header_string = header_string + cols[i]->name + sep + '|' + sep; header_string = header_string + cols[i]->name + sep + '|' + sep;
if (err_msg) [[unlikely]]
puts(err_msg);
} }
if (const size_t l_sep = strlen(sep) + 1; header_string.size() >= l_sep) if (const size_t l_sep = strlen(sep) + 1; header_string.size() >= l_sep)
header_string.resize(header_string.size() - l_sep); header_string.resize(header_string.size() - l_sep);
@ -173,7 +189,7 @@ void Server::print_results(const char* sep, const char* end){
void Server::close(){ void Server::close(){
if(this->server){ if(this->server){
auto server = static_cast<monetdbe_database*>(this->server); auto server = static_cast<monetdbe_database*>(this->server);
monetdbe_close(*(server)); monetdbe_close(*server);
free(server); free(server);
this->server = nullptr; this->server = nullptr;
} }
@ -215,3 +231,46 @@ bool Server::havehge() {
return false; return false;
#endif #endif
} }
void ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){
auto server = static_cast<Server*>(cxt->alt_server);
void* handle = nullptr;
uint32_t procedure_module_cursor = 0;
for(uint32_t i = 0; i < p->cnt; ++i) {
switch(p->queries[i][0]){
case 'Q': {
server->exec(p->queries[i]);
}
break;
case 'P': {
auto c = code_snippet(dlsym(handle, p->queries[i]+1));
c(cxt);
}
break;
case 'N': {
if(procedure_module_cursor < p->postproc_modules)
handle = p->__rt_loaded_modules[procedure_module_cursor++];
}
break;
case 'O': {
uint32_t limit;
memcpy(&limit, p->queries[i] + 1, sizeof(uint32_t));
if (limit == 0)
continue;
print_monetdb_results(server, " ", "\n", limit);
}
break;
default:
printf("Warning Q%u: unrecognized command %c.\n",
i, p->queries[i][0]);
}
}
}
int execTriggerPayload(void* args) {
auto spp = (StoredProcedurePayload*)(args);
ExecuteStoredProcedureEx(spp->p, spp->cxt);
delete spp;
return 0;
}

@ -4,18 +4,18 @@
struct Context; struct Context;
struct Server{ struct Server{
void *server = 0; void *server = nullptr;
Context *cxt = 0; Context *cxt = nullptr;
bool status = 0; bool status = false;
char* query = 0; char* query = nullptr;
int type = 1; int type = 1;
void* res = 0; void* res = nullptr;
void* ret_col = 0; void* ret_col = nullptr;
long long cnt = 0; long long cnt = 0;
char* last_error = 0; char* last_error = nullptr;
Server(Context* cxt = nullptr); explicit Server(Context* cxt = nullptr);
void connect(Context* cxt); void connect(Context* cxt);
void exec(const char* q); void exec(const char* q);
void *getCol(int col_idx); void *getCol(int col_idx);
@ -24,7 +24,7 @@ struct Server{
static bool havehge(); static bool havehge();
void test(const char*); void test(const char*);
void print_results(const char* sep = " ", const char* end = "\n"); void print_results(const char* sep = " ", const char* end = "\n");
friend void print_monetdb_results(Server* srv, const char* sep, const char* end, int limit); friend void print_monetdb_results(void* _srv, const char* sep, const char* end, int limit);
~Server(); ~Server();
}; };
@ -34,4 +34,9 @@ struct monetdbe_table_data{
void* cols; void* cols;
}; };
size_t
monetdbe_get_size(void* dbhdl, const char *table_name);
void*
monetdbe_get_col(void* dbhdl, const char *table_name, uint32_t col_id);
#endif #endif

@ -0,0 +1,90 @@
// Non-standard Extensions for MonetDBe, may break concurrency control!
#include "monetdbe.h"
#include <stdint.h>
#include "mal_client.h"
#include "sql_mvc.h"
#include "sql_semantic.h"
#include "mal_exception.h"
typedef struct column_storage {
int refcnt;
int bid;
int ebid; /* extra bid */
int uibid; /* bat with positions of updates */
int uvbid; /* bat with values of updates */
storage_type st; /* ST_DEFAULT, ST_DICT, ST_FOR */
bool cleared;
bool merged; /* only merge changes once */
size_t ucnt; /* number of updates */
ulng ts; /* version timestamp */
} column_storage;
typedef struct segment {
BUN start;
BUN end;
bool deleted; /* we need to keep a dense segment set, 0 - end of last segemnt,
some segments maybe deleted */
ulng ts; /* timestamp on this segment, ie tid of some active transaction or commit time of append/delete or
rollback time, ie ready for reuse */
ulng oldts; /* keep previous ts, for rollbacks */
struct segment *next; /* usualy one should be enough */
struct segment *prev; /* used in destruction list */
} segment;
/* container structure to allow sharing this structure */
typedef struct segments {
sql_ref r;
struct segment *h;
struct segment *t;
} segments;
typedef struct storage {
column_storage cs; /* storage on disk */
segments *segs; /* local used segements */
struct storage *next;
} storage;
typedef struct {
char language; /* 'S' or 's' or 'X' */
char depth; /* depth >= 1 means no output for trans/schema statements */
int remote; /* counter to make remote function names unique */
mvc *mvc;
char others[];
} backend;
typedef struct {
Client c;
char *msg;
monetdbe_data_blob blob_null;
monetdbe_data_date date_null;
monetdbe_data_time time_null;
monetdbe_data_timestamp timestamp_null;
str mid;
} monetdbe_database_internal;
size_t
monetdbe_get_size(monetdbe_database dbhdl, const char *table_name)
{
monetdbe_database_internal* hdl = (monetdbe_database_internal*)dbhdl;
backend* be = ((backend *)(((monetdbe_database_internal*)dbhdl)->c->sqlcontext));
mvc *m = be->mvc;
sql_table *t = find_table_or_view_on_scope(m, NULL, "sys", table_name, "CATALOG", false);
sql_column *col = ol_first_node(t->columns)->data;
sqlstore* store = m->store;
size_t sz = store->storage_api.count_col(m->session->tr, col, QUICK);
return sz;
}
void*
monetdbe_get_col(monetdbe_database dbhdl, const char *table_name, uint32_t col_id) {
monetdbe_database_internal* hdl = (monetdbe_database_internal*)dbhdl;
backend* be = ((backend *)(((monetdbe_database_internal*)dbhdl)->c->sqlcontext));
mvc *m = be->mvc;
sql_table *t = find_table_or_view_on_scope(m, NULL, "sys", table_name, "CATALOG", false);
sql_column *col = ol_fetch(t->columns, col_id);
sqlstore* store = m->store;
BAT *b = store->storage_api.bind_col(m->session->tr, col, QUICK);
BATiter iter = bat_iterator(b);
return iter.base;
}

@ -6,7 +6,7 @@ template <class Comparator, typename T = uint32_t>
class priority_vector : public vector_type<T> { class priority_vector : public vector_type<T> {
const Comparator comp; const Comparator comp;
public: public:
priority_vector(Comparator comp = std::less<T>{}) : explicit priority_vector(Comparator comp = std::less<T>{}) :
comp(comp), vector_type<T>(0) {} comp(comp), vector_type<T>(0) {}
void emplace_back(T val) { void emplace_back(T val) {
vector_type<T>::emplace_back(val); vector_type<T>::emplace_back(val);

@ -20,10 +20,6 @@
#include <sys/mman.h> #include <sys/mman.h>
#include <atomic> #include <atomic>
// fast numeric to string conversion
#include "jeaiii_to_text.h"
#include "dragonbox/dragonbox_to_chars.h"
struct SharedMemory struct SharedMemory
{ {
std::atomic<bool> a; std::atomic<bool> a;
@ -41,69 +37,7 @@ struct SharedMemory
} }
}; };
#ifndef __USE_STD_SEMAPHORE__ #endif // _WIN32
#ifdef __APPLE__
#include <dispatch/dispatch.h>
class A_Semaphore {
private:
dispatch_semaphore_t native_handle;
public:
A_Semaphore(bool v = false) {
native_handle = dispatch_semaphore_create(v);
}
void acquire() {
// puts("acquire");
dispatch_semaphore_wait(native_handle, DISPATCH_TIME_FOREVER);
}
void release() {
// puts("release");
dispatch_semaphore_signal(native_handle);
}
~A_Semaphore() {
}
};
#else
#include <semaphore.h>
class A_Semaphore {
private:
sem_t native_handle;
public:
A_Semaphore(bool v = false) {
sem_init(&native_handle, v, 1);
}
void acquire() {
sem_wait(&native_handle);
}
void release() {
sem_post(&native_handle);
}
~A_Semaphore() {
sem_destroy(&native_handle);
}
};
#endif
#endif
#endif
#ifdef __USE_STD_SEMAPHORE__
#define __AQUERY_ITC_USE_SEMPH__
#include <semaphore>
class A_Semaphore {
private:
std::binary_semaphore native_handle;
public:
A_Semaphore(bool v = false) {
native_handle = std::binary_semaphore(v);
}
void acquire() {
native_handle.acquire();
}
void release() {
native_handle.release();
}
~A_Semaphore() { }
};
#endif
#ifdef __AQUERY_ITC_USE_SEMPH__ #ifdef __AQUERY_ITC_USE_SEMPH__
A_Semaphore prompt{ true }, engine{ false }; A_Semaphore prompt{ true }, engine{ false };
@ -225,9 +159,10 @@ inline constexpr static unsigned char monetdbe_type_szs[] = {
1 1
}; };
constexpr uint32_t output_buffer_size = 65536; constexpr uint32_t output_buffer_size = 65536;
void print_monetdb_results(Server* srv, const char* sep = " ", const char* end = "\n", void print_monetdb_results(void* _srv, const char* sep = " ", const char* end = "\n",
uint32_t limit = std::numeric_limits<uint32_t>::max()) { uint32_t limit = std::numeric_limits<uint32_t>::max()) {
if (!srv->haserror() && srv->cnt && limit){ auto srv = static_cast<Server *>(_srv);
if (!srv->haserror() && srv->cnt && limit) {
char buffer[output_buffer_size]; char buffer[output_buffer_size];
auto _res = static_cast<monetdbe_result*> (srv->res); auto _res = static_cast<monetdbe_result*> (srv->res);
const auto ncols = _res->ncols; const auto ncols = _res->ncols;
@ -255,7 +190,7 @@ void print_monetdb_results(Server* srv, const char* sep = " ", const char* end =
puts("Error: separator or end string too long"); puts("Error: separator or end string too long");
goto cleanup; goto cleanup;
} }
if (header_string.size() - l_sep - 1>= 0) if (header_string.size() >= l_sep + 1)
header_string.resize(header_string.size() - l_sep - 1); header_string.resize(header_string.size() - l_sep - 1);
header_string += end + std::string(header_string.size(), '=') + end; header_string += end + std::string(header_string.size(), '=') + end;
fputs(header_string.c_str(), stdout); fputs(header_string.c_str(), stdout);
@ -590,9 +525,9 @@ start:
break; break;
case 'D': // delete procedure case 'D': // delete procedure
break; break;
case 'S': //save procedure case 'S': // save procedure
break; break;
case 'L': //load procedure case 'L': // load procedure
if (!load_proc_fromfile(current_procedure)) { if (!load_proc_fromfile(current_procedure)) {
cxt->stored_proc.insert_or_assign(proc_name, current_procedure); cxt->stored_proc.insert_or_assign(proc_name, current_procedure);
} }

@ -4,21 +4,26 @@
#include <thread> #include <thread>
#include <cstdio> #include <cstdio>
#include <cstdlib> #include <cstdlib>
using namespace std;
FILE *fp; FILE *fp;
long long testing_throughput(uint32_t n_jobs, bool prompt = true){ long long testing_throughput(uint32_t n_jobs, bool prompt = true){
using namespace std::chrono_literals;
printf("Threadpool througput test with %u jobs. Press any key to start.\n", n_jobs); printf("Threadpool througput test with %u jobs. Press any key to start.\n", n_jobs);
auto tp = ThreadPool(thread::hardware_concurrency()); auto tp = ThreadPool(std::thread::hardware_concurrency());
getchar(); getchar();
auto i = 0u; auto i = 0u;
fp = fopen("tmp.tmp", "wb"); fp = fopen("tmp.tmp", "wb");
auto time = chrono::high_resolution_clock::now(); auto time = std::chrono::high_resolution_clock::now();
while(i++ < n_jobs) tp.enqueue_task({ [](void* f) {fprintf(fp, "%d ", *(int*)f); free(f); }, new int(i) }); while(i++ < n_jobs) {
payload_t payload;
payload.f = [](void* f) {fprintf(fp, "%d ", *(int*)f); free(f); return 0; };
payload.args = new int(i);
tp.enqueue_task(payload);
}
puts("done dispatching."); puts("done dispatching.");
while (tp.busy()) this_thread::sleep_for(1s); while (tp.busy()) std::this_thread::sleep_for(1s);
auto t = (chrono::high_resolution_clock::now() - time).count(); auto t = (std::chrono::high_resolution_clock::now() - time).count();
printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", i, t, i/(double)(t)); printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", i, t, i/(double)(t));
//this_thread::sleep_for(2s); //this_thread::sleep_for(2s);
fclose(fp); fclose(fp);
@ -27,26 +32,31 @@ long long testing_throughput(uint32_t n_jobs, bool prompt = true){
long long testing_transaction(uint32_t n_burst, uint32_t n_batch, long long testing_transaction(uint32_t n_burst, uint32_t n_batch,
uint32_t base_time, uint32_t var_time, bool prompt = true, FILE* _fp = stdout){ uint32_t base_time, uint32_t var_time, bool prompt = true, FILE* _fp = stdout){
using namespace std::chrono_literals;
printf("Threadpool transaction test: burst: %u, batch: %u, time: [%u, %u].\n" printf("Threadpool transaction test: burst: %u, batch: %u, time: [%u, %u].\n"
, n_burst, n_batch, base_time, var_time + base_time); , n_burst, n_batch, base_time, var_time + base_time);
if (prompt) { if (prompt) {
puts("Press any key to start."); puts("Press any key to start.");
getchar(); getchar();
} }
auto tp = ThreadPool(thread::hardware_concurrency()); auto tp = ThreadPool(std::thread::hardware_concurrency());
fp = _fp; fp = _fp;
auto i = 0u, j = 0u; auto i = 0u, j = 0u;
auto time = chrono::high_resolution_clock::now(); auto time = std::chrono::high_resolution_clock::now();
while(j++ < n_batch){ while(j++ < n_batch){
i = 0u; i = 0u;
while(i++ < n_burst) while(i++ < n_burst) {
tp.enqueue_task({ [](void* f) { fprintf(fp, "%d ", *(int*)f); free(f); }, new int(j) }); payload_t payload;
payload.f = [](void* f) { fprintf(fp, "%d ", *(int*)f); free(f); return 0; };
payload.args = new int(j);
tp.enqueue_task(payload);
}
fflush(stdout); fflush(stdout);
this_thread::sleep_for(chrono::microseconds(rand()%var_time + base_time)); std::this_thread::sleep_for(std::chrono::microseconds(rand()%var_time + base_time));
} }
puts("done dispatching."); puts("done dispatching.");
while (tp.busy()) this_thread::sleep_for(1s); while (tp.busy()) std::this_thread::sleep_for(1s);
auto t = (chrono::high_resolution_clock::now() - time).count(); auto t = (std::chrono::high_resolution_clock::now() - time).count();
printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", j*i, t, j*i/(double)(t)); printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", j*i, t, j*i/(double)(t));
return t; return t;
@ -58,17 +68,17 @@ long long testing_destruction(bool prompt = true){
puts("Press any key to start."); puts("Press any key to start.");
getchar(); getchar();
} }
auto time = chrono::high_resolution_clock::now(); auto time = std::chrono::high_resolution_clock::now();
for(int i = 0; i < 8; ++i) for(int i = 0; i < 8; ++i)
testing_transaction(0xfff, 0xff, 400, 100, false, fp); testing_transaction(0xfff, 0xff, 400, 100, false, fp);
for(int i = 0; i < 64; ++i) for(int i = 0; i < 64; ++i)
testing_transaction(0xff, 0xf, 60, 20, false, fp); testing_transaction(0xff, 0xf, 60, 20, false, fp);
for(int i = 0; i < 1024; ++i) { for(int i = 0; i < 1024; ++i) {
auto tp = new ThreadPool(256); auto tp = new ThreadPool(255);
delete tp; delete tp;
} }
return 0; return 0;
auto t = (chrono::high_resolution_clock::now() - time).count(); auto t = (std::chrono::high_resolution_clock::now() - time).count();
fclose(fp); fclose(fp);
return t; return t;
} }

@ -1,4 +1,5 @@
#include "threading.h" #include "threading.h"
#include "libaquery.h"
#include <thread> #include <thread>
#include <atomic> #include <atomic>
#include <mutex> #include <mutex>
@ -105,7 +106,8 @@ ThreadPool::ThreadPool(uint32_t n_threads)
current_payload = new payload_t[n_threads]; current_payload = new payload_t[n_threads];
for (uint32_t i = 0; i < n_threads; ++i){ for (uint32_t i = 0; i < n_threads; ++i){
atomic_init(tf + i, static_cast<unsigned char>(0b10)); // atomic_init(tf + i, static_cast<unsigned char>(0b10));
tf[i] = static_cast<unsigned char>(0b10);
th[i] = thread(&ThreadPool::daemon_proc, this, i); th[i] = thread(&ThreadPool::daemon_proc, this, i);
} }
@ -152,21 +154,50 @@ bool ThreadPool::busy(){
return true; return true;
} }
Trigger::Trigger(ThreadPool* tp){ IntervalBasedTriggerHost::IntervalBasedTriggerHost(ThreadPool* tp){
this->tp = tp;
this->triggers = new vector_type<IntervalBasedTrigger>;
trigger_queue_lock = new mutex();
this->now = std::chrono::high_resolution_clock::now().time_since_epoch().count();
}
void IntervalBasedTriggerHost::add_trigger(StoredProcedure *p, uint32_t interval) {
auto tr = IntervalBasedTrigger{.interval = interval, .time_remaining = 0, .sp = p};
auto vt_triggers = static_cast<vector_type<IntervalBasedTrigger> *>(this->triggers);
trigger_queue_lock->lock();
vt_triggers->emplace_back(tr);
trigger_queue_lock->unlock();
}
void IntervalBasedTriggerHost::tick() {
const auto current_time = std::chrono::high_resolution_clock::now().time_since_epoch().count();
const auto delta_t = static_cast<uint32_t>((current_time - now) / 1000000); // miliseconds precision
now = current_time;
auto vt_triggers = static_cast<vector_type<IntervalBasedTrigger> *>(this->triggers);
trigger_queue_lock->lock();
for(auto& t : *vt_triggers) {
if(t.tick(delta_t)) {
payload_t payload;
payload.f = execTriggerPayload;
payload.args = static_cast<void*>(new StoredProcedurePayload {t.sp, cxt});
tp->enqueue_task(payload);
}
}
trigger_queue_lock->unlock();
} }
void IntervalBasedTrigger::timer::reset(){ void IntervalBasedTrigger::reset() {
time_remaining = interval; time_remaining = interval;
} }
bool IntervalBasedTrigger::timer::tick(uint32_t t){ bool IntervalBasedTrigger::tick(uint32_t delta_t) {
if (time_remaining > t) { bool ret = false;
time_remaining -= t; if (time_remaining <= delta_t)
return false; ret = true;
} if (auto curr_dt = delta_t % interval; time_remaining <= curr_dt)
else{ time_remaining = interval + time_remaining - curr_dt;
time_remaining = interval - t%interval; else
return true; time_remaining = time_remaining - curr_dt;
} return ret;
} }

@ -19,7 +19,7 @@ struct payload_t{
class ThreadPool{ class ThreadPool{
public: public:
ThreadPool(uint32_t n_threads = 0); explicit ThreadPool(uint32_t n_threads = 0);
void enqueue_task(const payload_t& payload); void enqueue_task(const payload_t& payload);
bool busy(); bool busy();
virtual ~ThreadPool(); virtual ~ThreadPool();
@ -39,29 +39,45 @@ private:
}; };
class Trigger{ #include <thread>
private: #include <mutex>
void* triggers; //min-heap by t-rem class A_Semphore;
virtual void tick() = 0;
class TriggerHost {
protected:
void* triggers;
std::thread* handle;
ThreadPool *tp;
Context* cxt;
std::mutex* trigger_queue_lock;
virtual void tick() = 0;
public: public:
Trigger(ThreadPool* tp); TriggerHost() = default;
virtual ~TriggerHost() = default;
}; };
class IntervalBasedTrigger : public Trigger{ struct StoredProcedure;
struct IntervalBasedTrigger {
uint32_t interval; // in milliseconds
uint32_t time_remaining;
StoredProcedure* sp;
void reset();
bool tick(uint32_t t);
};
class IntervalBasedTriggerHost : public TriggerHost {
public: public:
struct timer{ explicit IntervalBasedTriggerHost(ThreadPool *tp);
uint32_t interval; // in milliseconds void add_trigger(StoredProcedure* stored_procedure, uint32_t interval);
uint32_t time_remaining; void remove_trigger(uint32_t tid);
void reset();
bool tick(uint32_t t);
};
void add_trigger();
private: private:
unsigned long long now;
void tick() override; void tick() override;
}; };
class CallbackBasedTrigger : public Trigger{ class CallbackBasedTriggerHost : public TriggerHost {
public: public:
void add_trigger(); void add_trigger();
private: private:

@ -16,18 +16,6 @@ struct SharedMemory
void FreeMemoryMap(); void FreeMemoryMap();
}; };
#ifndef __USE_STD_SEMAPHORE__
class A_Semaphore {
private:
void* native_handle;
public:
A_Semaphore(bool);
void acquire();
void release();
~A_Semaphore();
};
#endif
#endif // WIN32 #endif // WIN32
#endif // WINHELPER #endif // WINHELPER

@ -5,18 +5,20 @@ LOAD MODULE FROM "./libirf.so"
predict(X:vecvecdouble) -> vecint predict(X:vecvecdouble) -> vecint
); );
create table source(x1 double, x2 double, x3 double, x4 double, x5 int64); create table source(x1 double, x2 double, x3 double, x4 double, x5 int64);
load data infile "data/benchmark" into table source fields terminated by ","; -- Create trigger 1 ~~ to predict whenever sz(source > ?)
-- Create trigger 2 ~~ to auto feed ~
load data infile "data/benchmark" into table source fields terminated by ",";
create table sparse(x int); create table sparse(x int);
insert into sparse values (1); insert into sparse values (1);
insert into sparse values (1); insert into sparse values (1);
insert into sparse values (1); insert into sparse values (1);
insert into sparse values (1); insert into sparse values (1);
select newtree(6, 4, sparse.x, 0, 4, 2, 0, 400, 2147483647) from sparse select newtree(6, 4, sparse.x, 0, 4, 2, 0, 400, 2147483647) from sparse
select fit(pack(x1, x2, x3, x4), x5) from source select fit(pack(x1, x2, x3, x4), x5) from source
-- select pack(x1, x2, x3, x4) from source -- select pack(x1, x2, x3, x4) from source
select predict(pack(x1, x2, x3, x4)) from source select predict(pack(x1, x2, x3, x4)) from source
Loading…
Cancel
Save