Interval based triggers

Monetdb get col/sz
Updated RF/dataproc
Bug fixes and improvements
master
Bill 2 years ago
parent cf8185c5f0
commit 906daf577b

4
.gitignore vendored

@ -59,6 +59,10 @@ data/benchmark
!nyctx100.csv
!network.csv
!test_complex.csv
data/electricity*
data/covtype*
data/phishing*
data/power*
*.out
*.asm
!mmw.so

@ -2,6 +2,7 @@ OS_SUPPORT =
MonetDB_LIB =
MonetDB_INC =
Defines =
CC = $(CXX) -xc
CXXFLAGS = --std=c++2a
ifeq ($(AQ_DEBUG), 1)
OPTFLAGS = -g3 #-fsanitize=address
@ -17,7 +18,7 @@ COMPILER = $(strip $(_COMPILER))
LIBTOOL = ar rcs
USELIB_FLAG = -Wl,--whole-archive,libaquery.a -Wl,-no-whole-archive
LIBAQ_SRC = server/monetdb_conn.cpp server/libaquery.cpp
LIBAQ_OBJ = monetdb_conn.o libaquery.o
LIBAQ_OBJ = monetdb_conn.o libaquery.o monetdb_ext.o
SEMANTIC_INTERPOSITION = -fno-semantic-interposition
RANLIB = ranlib
_LINKER_BINARY = $(shell `$(CXX) -print-prog-name=ld` -v 2>&1 | grep -q LLVM && echo lld || echo ld)
@ -43,7 +44,7 @@ else
LIBTOOL = gcc-ar rcs
endif
endif
OPTFLAGS += $(SEMANTIC_INTERPOSITION)
LINKFLAGS += $(SEMANTIC_INTERPOSITION)
ifeq ($(PCH), 1)
PCHFLAGS = -include server/pch.hpp
@ -82,7 +83,7 @@ else
MonetDB_INC += $(AQ_MONETDB_INC)
MonetDB_INC += -I/usr/local/include/monetdb -I/usr/include/monetdb
endif
MonetDB_LIB += -lmonetdbe
MonetDB_LIB += -lmonetdbe -lmonetdbsql -lbat
endif
ifeq ($(THREADING),1)
@ -128,6 +129,7 @@ pch:
$(CXX) -x c++-header server/pch.hpp $(FPIC) $(CXXFLAGS)
libaquery:
$(CXX) -c $(FPIC) $(PCHFLAGS) $(LIBAQ_SRC) $(OS_SUPPORT) $(CXXFLAGS) &&\
$(CC) -c server/monetdb_ext.c $(OPTFLAGS) $(MonetDB_INC) &&\
$(LIBTOOL) libaquery.a $(LIBAQ_OBJ) &&\
$(RANLIB) libaquery.a

@ -2,7 +2,7 @@
## GLOBAL CONFIGURATION FLAGS
version_string = '0.6.0a'
version_string = '0.7.0a'
add_path_to_ldpath = True
rebuild_backend = False
run_backend = True

@ -5,20 +5,18 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
#
# Bill Sun 2022 - 2023
from __future__ import absolute_import, division, unicode_literals
import json
from threading import Lock
from aquery_parser.sql_parser import scrub
from aquery_parser.parser import scrub
from aquery_parser.utils import ansi_string, simple_op, normal_op
import aquery_parser.parser
parse_locker = Lock() # ENSURE ONLY ONE PARSING AT A TIME
common_parser = None
mysql_parser = None
sqlserver_parser = None
SQL_NULL = {"null": {}}
@ -33,44 +31,10 @@ def parse(sql, null=SQL_NULL, calls=simple_op):
with parse_locker:
if not common_parser:
common_parser = sql_parser.common_parser()
common_parser = aquery_parser.parser.common_parser()
result = _parse(common_parser, sql, null, calls)
return result
def parse_mysql(sql, null=SQL_NULL, calls=simple_op):
"""
PARSE MySQL ASSUME DOUBLE QUOTED STRINGS ARE LITERALS
:param sql: String of SQL
:param null: What value to use as NULL (default is the null function `{"null":{}}`)
:return: parse tree
"""
global mysql_parser
with parse_locker:
if not mysql_parser:
mysql_parser = sql_parser.mysql_parser()
return _parse(mysql_parser, sql, null, calls)
def parse_sqlserver(sql, null=SQL_NULL, calls=simple_op):
"""
PARSE MySQL ASSUME DOUBLE QUOTED STRINGS ARE LITERALS
:param sql: String of SQL
:param null: What value to use as NULL (default is the null function `{"null":{}}`)
:return: parse tree
"""
global sqlserver_parser
with parse_locker:
if not sqlserver_parser:
sqlserver_parser = sql_parser.sqlserver_parser()
return _parse(sqlserver_parser, sql, null, calls)
parse_bigquery = parse_mysql
def _parse(parser, sql, null, calls):
utils.null_locations = []
utils.scrub_op = calls
@ -85,4 +49,4 @@ def _parse(parser, sql, null, calls):
_ = json.dumps
__all__ = ["parse", "format", "parse_mysql", "parse_bigquery", "normal_op", "simple_op"]
__all__ = ["parse", "format", "normal_op", "simple_op"]

@ -5,7 +5,7 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
#
# Bill Sun 2022 - 2023
# SQL CONSTANTS
from mo_parsing import *

@ -5,7 +5,7 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
#
# Bill Sun 2022 - 2023
from sre_parse import WHITESPACE
@ -28,37 +28,12 @@ simple_ident = Regex(simple_ident.__regex__()[1])
def common_parser():
combined_ident = Combine(delimited_list(
ansi_ident | mysql_backtick_ident | simple_ident, separator=".", combine=True,
)).set_parser_name("identifier")
return parser(ansi_string | mysql_doublequote_string, combined_ident)
def mysql_parser():
mysql_string = ansi_string | mysql_doublequote_string
mysql_ident = Combine(delimited_list(
mysql_backtick_ident | sqlserver_ident | simple_ident,
separator=".",
combine=True,
)).set_parser_name("mysql identifier")
return parser(mysql_string, mysql_ident)
def sqlserver_parser():
combined_ident = Combine(delimited_list(
ansi_ident
| mysql_backtick_ident
| sqlserver_ident
| Word(FIRST_IDENT_CHAR, IDENT_CHAR),
separator=".",
combine=True,
ansi_ident | aquery_backtick_ident | simple_ident, separator=".", combine=True,
)).set_parser_name("identifier")
return parser(ansi_string, combined_ident, sqlserver=True)
return parser(ansi_string | aquery_doublequote_string, combined_ident)
def parser(literal_string, ident, sqlserver=False):
def parser(literal_string, ident):
with Whitespace() as engine:
engine.add_ignore(Literal("--") + restOfLine)
engine.add_ignore(Literal("#") + restOfLine)
@ -184,8 +159,6 @@ def parser(literal_string, ident, sqlserver=False):
)
)
if not sqlserver:
# SQL SERVER DOES NOT SUPPORT [] FOR ARRAY CONSTRUCTION (USED FOR IDENTIFIERS)
create_array = (
Literal("[") + delimited_list(Group(expr))("args") + Literal("]")
| create_array

@ -5,7 +5,7 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
#
# Bill Sun 2022 - 2023
# KNOWN TYPES

@ -5,7 +5,7 @@
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# Contact: Kyle Lahnakoski (kyle@lahnakoski.com)
#
# Bill Sun 2022 - 2023
import ast
@ -610,9 +610,8 @@ hex_num = (
# STRINGS
ansi_string = Regex(r"\'(\'\'|[^'])*\'") / to_string
mysql_doublequote_string = Regex(r'\"(\"\"|[^"])*\"') / to_string
aquery_doublequote_string = Regex(r'\"(\"\"|[^"])*\"') / to_string
# BASIC IDENTIFIERS
ansi_ident = Regex(r'\"(\"\"|[^"])*\"') / unquote
mysql_backtick_ident = Regex(r"\`(\`\`|[^`])*\`") / unquote
sqlserver_ident = Regex(r"\[(\]\]|[^\]])*\]") / unquote
aquery_backtick_ident = Regex(r"\`(\`\`|[^`])*\`") / unquote

@ -0,0 +1,2 @@
make snippet_uselib
cp ./dll.so procedures/q70.so

@ -5,14 +5,14 @@ from typing import List
name : str = input('Filename (in path ./procedures/<filename>.aqp):')
def write():
s : str = input()
s : str = input('Enter queries: empty line to stop. \n')
qs : List[str] = []
while(len(s) and not s.startswith('S')):
qs.append(s)
s = input()
ms : int = int(input())
ms : int = int(input('number of modules:'))
with open(f'./procedures/{name}.aqp', 'wb') as fp:
fp.write(struct.pack("I", len(qs) + (ms > 0)))
@ -27,21 +27,29 @@ def write():
fp.write(b'\x00')
def read():
def read(cmd : str):
rc = len(cmd) > 1 and cmd[1] == 'c'
clip = ''
with open(f'./procedures/{name}.aqp', 'rb') as fp:
nq = struct.unpack("I", fp.read(4))[0]
ms = struct.unpack("I", fp.read(4))[0]
qs = fp.read().split(b'\x00')
print(f'Procedure {name}, {nq} queries, {ms} modules:')
for q in qs:
print(' ' + q.decode('utf-8'))
q = q.decode('utf-8').strip()
if q:
q = f'"{q}",' if rc else f'\t{q}'
print(q)
clip += q + '\n'
if rc and not input('copy to clipboard?').lower().startswith('n'):
import pyperclip
pyperclip.copy(clip)
if __name__ == '__main__':
while True:
cmd = input("r for read, w for write: ")
cmd = input("r for read, rc to read c_str, w for write: ")
if cmd.lower().startswith('r'):
read()
read(cmd.lower())
break
elif cmd.lower().startswith('w'):
write()

@ -0,0 +1,43 @@
import os
sep = os.sep
# toggles
dataset = 'power' # [covtype, electricity, mixed, phishing, power]
use_threadpool = False # True
# environments
input_prefix = f'data{sep}{dataset}_orig'
output_prefix = f'data{sep}{dataset}'
sep_field = ','
sep_subfield = ';'
lst_files = os.listdir(input_prefix)
lst_files.sort()
try:
os.mkdir(output_prefix)
except FileExistsError:
pass
def process(f : str):
filename = input_prefix + sep + f
ofilename = output_prefix + sep + f[:-3] + 'csv'
with open(filename, 'rb') as ifile:
icontents = ifile.read().decode('utf-8')
with open(ofilename, 'wb') as ofile:
for l in icontents.splitlines():
fields = l.strip().split(' ')
subfields = fields[:-1]
ol = ( # fields[0] + sep_field +
sep_subfield.join(subfields) +
sep_field + fields[-1] + '\n')
ofile.write(ol.encode('utf-8'))
if not use_threadpool:
for f in lst_files:
process(f)
elif __name__ == '__main__':
from multiprocessing import Pool
with Pool(8) as tp:
tp.map(process, lst_files)

@ -9,26 +9,29 @@ struct DR;
struct DT;
//enum Evaluation {gini, entropy, logLoss};
class DecisionTree{
class DecisionTree
{
public:
DT *DTree = nullptr;
int maxHeight;
double minIG;
long maxHeight;
long feature;
long maxFeature;
long seed;
bool isRF;
long classes;
int *Sparse;
double forgetRate;
double increaseRate;
double initialIR;
Evaluation evalue;
long Rebuild;
long roundNo;
long called;
long retain;
long lastT;
long lastAll;
DecisionTree(int hight, long f, int* sparse, double forget, long maxFeature, long noClasses, Evaluation e, long r, long rb);
DecisionTree(long f, int *sparse, double forget, long maxFeature, long noClasses, Evaluation e);
void Stablelize();
@ -38,9 +41,9 @@ minEval findMinGiniDense(double** data, long* result, long* totalT, long size, l
minEval findMinGiniSparse(double **data, long *result, long *totalT, long size, long col, DT *current);
minEval incrementalMinGiniDense(double** data, long* result, long size, long col, long*** count, double** record, long* max, long newCount, long forgetSize, bool isRoot);
minEval incrementalMinGiniDense(double **data, long *result, long size, long col, long ***count, double **record, long *max, long newCount, long forgetSize, double **forgottenData, long *forgottenClass);
minEval incrementalMinGiniSparse(double** dataNew, long* resultNew, long sizeNew, long sizeOld, DT* current, long col, long forgetSize, bool isRoot);
minEval incrementalMinGiniSparse(double **dataNew, long *resultNew, long sizeNew, long sizeOld, DT *current, long col, long forgetSize, double **forgottenData, long *forgottenClass);
long *fitThenPredict(double **trainData, long *trainResult, long trainSize, double **testData, long testSize);

@ -26,7 +26,7 @@ minEval giniSparse(double** data, long* result, long* d, long size, long col, lo
double gini1, gini2;
double c;
long l, r;
for(i=0; i<size; i++){
for(i=0; i<size-1; i++){
c = data[d[i]][col];
if(c==max)break;
count[result[d[i]]]++;
@ -62,7 +62,7 @@ minEval entropySparse(double** data, long* result, long* d, long size, long col,
double entropy1, entropy2;
double c;
long l, r;
for(i=0; i<size; i++){
for(i=0; i<size-1; i++){
c = data[d[i]][col];
if(c==max)break;
count[result[d[i]]]++;
@ -73,8 +73,8 @@ minEval entropySparse(double** data, long* result, long* d, long size, long col,
for(j=0;j<classes;j++){
l = count[j];
r = totalT[j]-l;
entropy1 -= ((double)l/total)*log((double)l/total);
entropy2 -= ((double)r/(size-total))*log((double)r/(size-total));
if(l!=0)entropy1 -= ((double)l/total)*log((double)l/total);
if(r!=0)entropy2 -= ((double)r/(size-total))*log((double)r/(size-total));
}
entropy1 = entropy1*total/size + entropy2*(size-total)/size;
if(ret.eval>entropy1){
@ -140,8 +140,8 @@ minEval entropySparseIncremental(long sizeTotal, long classes, double* newSorted
for(j=0;j<classes;j++){
l = count[j];
r = T[j]-l;
e1 -= ((double)l/total)*log((double)l/total);
e2 -= ((double)r/(sizeTotal-total))*log((double)r/(sizeTotal-total));
if(l!=0)e1 -= ((double)l/total)*log((double)l/total);
if(r!=0)e2 -= ((double)r/(sizeTotal-total))*log((double)r/(sizeTotal-total));
}
e1 = e1*total/sizeTotal + e2*(sizeTotal-total)/sizeTotal;
if(ret.eval>e1){
@ -159,9 +159,9 @@ minEval giniDense(long max, long size, long classes, long** rem, long* d, double
double gini1, gini2;
long *t, *t2, *r, *r2, i, j;
for(i=0;i<max;i++){
t = rem[d[i]];
t = rem[i];
if(i>0){
t2 = rem[d[i-1]];
t2 = rem[i-1];
for(j=0;j<=classes;j++){
t[j]+=t2[j];
}
@ -179,7 +179,7 @@ minEval giniDense(long max, long size, long classes, long** rem, long* d, double
gini1 = (gini1*t[classes])/size + (gini2*(size-t[classes]))/size;
if(gini1<ret.eval){
ret.eval = gini1;
ret.value = record[d[i]];
ret.value = record[i];
ret.left = t[classes];
}
}
@ -193,9 +193,9 @@ minEval entropyDense(long max, long size, long classes, long** rem, long* d, dou
double entropy1, entropy2;
long *t, *t2, *r, *r2, i, j;
for(i=0;i<max;i++){
t = rem[d[i]];
t = rem[i];
if(i>0){
t2 = rem[d[i-1]];
t2 = rem[i-1];
for(j=0;j<=classes;j++){
t[j]+=t2[j];
}
@ -207,70 +207,14 @@ minEval entropyDense(long max, long size, long classes, long** rem, long* d, dou
long l, r;
l = t[j];
r = totalT[j]-l;
entropy1 -= ((double)l/t[classes])*log((double)l/t[classes]);
entropy2 -= ((double)r/(size-t[classes]))*log((double)r/(size-t[classes]));
if(l!=0)entropy1 -= ((double)l/t[classes])*log((double)l/t[classes]);
if(r!=0)entropy2 -= ((double)r/(size-t[classes]))*log((double)r/(size-t[classes]));
}
entropy1 = entropy1*t[classes]/size + entropy2*(size-t[classes])/size;
if(entropy1<ret.eval){
ret.eval = entropy1;
ret.value = record[d[i]];
ret.left = t[classes];
}
}
return ret;
}
minEval giniDenseIncremental(long max, double* record, long** count, long classes, long newSize, long* T){
double gini1, gini2;
minEval ret;
long i, j;
ret.eval = DBL_MAX;
for(i=0; i<max; i++){
if(count[i][classes]==newSize){
continue;
}
gini1 = 1.0;
gini2 = 1.0;
for(j=0;j<classes;j++){
long l, r;
l = count[i][j];
r = T[j]-l;
gini1 -= pow((double)l/count[i][classes], 2);
gini2 -= pow((double)r/(newSize-count[i][classes]), 2);
}
gini1 = gini1*count[i][classes]/newSize + gini2*((newSize-count[i][classes]))/newSize;
if(gini1<ret.eval){
ret.eval = gini1;
ret.value = record[i];
}
}
return ret;
}
minEval entropyDenseIncremental(long max, double* record, long** count, long classes, long newSize, long* T){
double entropy1, entropy2;
minEval ret;
long i, j;
ret.eval = DBL_MAX;
for(i=0; i<max; i++){
if(count[i][classes]==newSize or count[i][classes]==0){
continue;
}
entropy1 = 0;
entropy2 = 0;
for(j=0;j<classes;j++){
long l, r;
l = count[i][j];
r = T[j]-l;
entropy1 -= ((double)l/count[i][classes])*log((double)l/count[i][classes]);
entropy2 -= (double)r/(newSize-count[i][classes])*log((double)r/(newSize-count[i][classes]));
}
entropy1 = entropy1*count[i][classes]/newSize + entropy2*((newSize-count[i][classes]))/newSize;
if(entropy1<ret.eval){
ret.eval = entropy1;
ret.value = record[i];
ret.left = t[classes];
}
}
return ret;

@ -17,8 +17,4 @@ minEval giniDense(long max, long size, long classes, long** rem, long* d, double
minEval entropyDense(long max, long size, long classes, long** rem, long* d, double* record, long* totalT);
minEval giniDenseIncremental(long max, double* record, long** count, long classes, long newSize, long* T);
minEval entropyDenseIncremental(long max, double* record, long** count, long classes, long newSize, long* T);
#endif

@ -2,7 +2,27 @@
#include <stdlib.h>
#include <stdio.h>
#include <ctime>
#include <math.h>
#include <algorithm>
#include <boost/math/distributions/students_t.hpp>
#include <random>
long poisson(int Lambda)
{
int k = 0;
long double p = 1.0;
long double l = exp(-Lambda);
srand((long)clock());
while(p>=l)
{
double u = (double)(rand()%10000)/10000;
p *= u;
k++;
}
if (k>11)k=11;
return k-1;
}
struct DT{
int height;
long* featureId;
@ -30,62 +50,126 @@ struct DT{
long size = 0;// Size of the dataset
};
RandomForest::RandomForest(long mTree, long actTree, long rTime, int h, long feature, int* s, double forg, long maxF, long noC, Evaluation eval, long r, long rb){
RandomForest::RandomForest(long mTree, long feature, int* s, double forg, long noC, Evaluation eval, bool b, double t){
srand((long)clock());
Rebuild = rb;
if(actTree<1)actTree=1;
noTree = actTree;
activeTree = actTree;
treePointer = 0;
if(mTree<actTree)mTree=activeTree;
bagging = b;
activeTree = mTree;
maxTree = mTree;
if(rTime<=0)rTime=1;
rotateTime = rTime;
timer = 0;
retain = r;
allT = new long[mTree];
tThresh=t;
lastT = -2;
lastAll = 0;
long i;
height = h;
f = feature;
sparse = new int[f];
for(i=0; i<f; i++)sparse[i]=s[i];
forget = forg;
maxFeature = maxF;
noClasses = noC;
e = eval;
minF = floor(sqrt((double)f))+2;
if(minF>f)minF=f;
DTrees = (DecisionTree**)malloc(mTree*sizeof(DecisionTree*));
for(i=0; i<mTree; i++){
if(i<actTree){
DTrees[i] = new DecisionTree(height, f, sparse, forget, maxFeature, noClasses, e, r, rb);
}
else{
DTrees[i]=nullptr;
}
for(i=0; i<maxTree; i++){
DTrees[i] = new DecisionTree(f, sparse, forget, minF+rand()%(f+1-minF), noClasses, e);
DTrees[i]->isRF=true;
}
}
void RandomForest::fit(double** data, long* result, long size){
if(timer==rotateTime and maxTree!=activeTree){
Rotate();
timer=0;
}
long i, j, k;
long i, j, k, l;
double** newData;
long* newResult;
for(i=0; i<activeTree; i++){
newData = new double*[size];
newResult = new long[size];
long localT = 0;
int stale = 0;
if(lastT==-2){
lastT=-1;
}else{
for(i=0; i<maxTree; i++)allT[i] = 0;
for(i=0; i<size; i++){
if(Test(data[i], result[i])==result[i])localT++;
}
long localAll = size;
if(lastT>=0){
double lastSm = (double)lastT/lastAll;
double localSm = (double)localT/localAll;
double lastSd = sqrt(pow((1.0-lastSm),2)*lastT+pow(lastSm,2)*(lastAll-lastT)/(lastAll-1));
double localSd = sqrt(pow((1.0-localSm),2)*localT+pow(localSm,2)*(localAll-localT)/(localAll-1));
double v = lastAll+localAll-2;
double sp = sqrt(((lastAll-1) * lastSd * lastSd + (localAll-1) * localSd * localSd) / v);
double q;
double t = lastSm-localSm;
if(sp==0){q = 1;}
else{
t = t/(sp*sqrt(1.0/lastAll+1.0/localAll));
boost::math::students_t dist(v);
double c = cdf(dist, t);
q = cdf(complement(dist, fabs(t)));
}
if(q<=tThresh){
lastT += localT;
lastAll += localAll;
}else if(t<0){
lastT = localT;
lastAll = localAll;
}else{
double newAcc = (double)localT/localAll;
double lastAcc= (double)lastT/lastAll;
stale = floor((newAcc-lastAcc)/(lastAcc)*maxTree);
lastT = localT;
lastAll = localAll;
}
}else{
lastT = localT;
lastAll = localAll;
}
}
Rotate(stale);
for(i=0; i<maxTree; i++){
long times;
if(bagging)times = poisson(6);
else times=1;
if(times==0)continue;
newData = (double**)malloc(sizeof(double*)*size*times);
newResult = (long*)malloc(sizeof(long)*size*times);
long c = 0;
for(j = 0; j<size*times; j++){
long jj;
if(bagging) jj = rand()%size;
else jj=j;
newData[j] = (double*)malloc((f+1)*sizeof(double));
for(l=0; l<f; l++){
newData[j][l] = data[jj][l];
}
newData[j][f] = 0;
newResult[j] = result[jj];
}
DTrees[i]->fit(newData, newResult, size*times);
}
/*for(i=0; i<maxTree; i++){
//backupTrees[i]->retain = 10*size;
//if(backupTrees[i]==nullptr) continue;
long times;
//times = poisson(posMean);
//if(times==0)continue;
times=1;
newData = (double**)malloc(sizeof(double*)*size*times);
newResult = (long*)malloc(sizeof(long)*size*times);
long c = 0;
for(j = 0; j<size; j++){
newData[j] = new double[f];
for(k=0; k<f; k++){
newData[j][k] = data[j][k];
long jj = rand()%size;
jj=j;
for(k=0; k<times; k++){
newData[j*times+k] = (double*)malloc((f+1)*sizeof(double));
for(l=0; l<f; l++){
newData[j*times+k][l] = data[jj][l];
}
newResult[j] = result[j];
newData[j*times+k][f] = 0;
newResult[j*times+k] = result[jj];
}
DTrees[(i+treePointer)%maxTree]->fit(newData, newResult, size);
}
timer++;
backupTrees[i]->fit(newData, newResult, size*times);
}*/
}
long* RandomForest::fitThenPredict(double** trainData, long* trainResult, long trainSize, double** testData, long testSize){
@ -97,36 +181,80 @@ long* RandomForest::fitThenPredict(double** trainData, long* trainResult, long t
return testResult;
}
void RandomForest::Rotate(){
if(noTree==maxTree){
DTrees[(treePointer+activeTree)%maxTree]->Free();
delete DTrees[(treePointer+activeTree)%maxTree];
}else{
noTree++;
void RandomForest::Rotate(long stale){
long i, j, k;
long minIndex = -1;
if(stale>=0)return;
else{
stale = std::min(stale, maxTree);
stale*=-1;
while(stale>0){
long currentMin = 2147483647;
for(i = 0; i<maxTree; i++){
if(allT[i]<currentMin){
currentMin=allT[i];
minIndex=i;
}
}
stale--;
if(minIndex<0)break;
allT[minIndex] = 2147483647;
double** newData;
long* newResult;
long size = 0;
long lastT2 = 0;
size = DTrees[minIndex]->DTree->size;
newData = (double**)malloc(sizeof(double*)*size);
newResult = (long*)malloc(sizeof(long)*size);
for(j = 0; j<size; j++){
newData[j] = (double*)malloc(sizeof(double)*(f+1));
for(k=0; k<f; k++){
newData[j][k] = DTrees[minIndex]->DTree->dataRecord[j][k];
}
newData[j][f] = 0;
newResult[j] = DTrees[minIndex]->DTree->resultRecord[j];
}
DTrees[minIndex]->Stablelize();
DTrees[minIndex]->Free();
delete DTrees[minIndex];
DTrees[minIndex] = new DecisionTree(f, sparse, forget, minF+rand()%(f+1-minF), noClasses, e);
DTrees[minIndex]->isRF=true;
DTrees[minIndex]->fit(newData, newResult, size);
for(j=0; j<size; j++){
if(DTrees[minIndex]->Test(newData[j], DTrees[minIndex]->DTree)==newResult[j])lastT2++;
}
DTrees[minIndex]->lastAll=size;
DTrees[minIndex]->lastT=lastT2;
}
DTrees[(treePointer+activeTree)%maxTree] = new DecisionTree(height, f, sparse, forget, maxFeature, noClasses, e, retain, Rebuild);
long size = DTrees[(treePointer+activeTree-1)%maxTree]->DTree->size;
double** newData = new double*[size];
long* newResult = new long[size];
for(long j = 0; j<size; j++){
newData[j] = new double[f];
for(long k=0; k<f; k++){
newData[j][k] = DTrees[(treePointer+activeTree-1)%maxTree]->DTree->dataRecord[j][k];
}
newResult[j] = DTrees[(treePointer+activeTree-1)%maxTree]->DTree->resultRecord[j];
}
DTrees[(treePointer+activeTree)%maxTree]->fit(newData, newResult, size);
DTrees[treePointer]->Stablelize();
if(++treePointer==maxTree)treePointer=0;
long RandomForest::Test(double* data, long result){
long i;
long predict[noClasses];
for(i=0; i<noClasses; i++){
predict[i]=0;
}
for(i=0; i<maxTree; i++){
long tmp = DTrees[i]->Test(data, DTrees[i]->DTree);
predict[tmp]++;
if(tmp==result)allT[i]++;
}
long ret = 0;
for(i=1; i<noClasses; i++){
if(predict[i]>predict[ret])ret = i;
}
return ret;
}
long RandomForest::Test(double* data){
long i;
long predict[noClasses];
for(i=0; i<noClasses; i++)predict[i]=0;
for(i=0; i<noTree; i++){
for(i=0; i<maxTree; i++){
predict[DTrees[i]->Test(data, DTrees[i]->DTree)]++;
}

@ -14,33 +14,34 @@ struct DT;
class RandomForest{
public:
long noTree;
long maxTree;
long activeTree;
long treePointer;
long rotateTime;
long timer;
long retain;
DecisionTree** DTrees = nullptr;
long* allT;
double tThresh;
DecisionTree** DTrees;
DecisionTree** backupTrees;
long height;
long Rebuild;
bool bagging;
long f;
int* sparse;
double forget;
long maxFeature;
long noClasses;
Evaluation e;
long lastT;
long lastAll;
int minF;
RandomForest(long maxTree, long activeTree, long rotateTime, int height, long f, int* sparse, double forget, long maxFeature=0, long noClasses=2, Evaluation e=Evaluation::gini, long r=-1, long rb=2147483647);
RandomForest(long maxTree, long f, int* sparse, double forget, long noClasses=2, Evaluation e=Evaluation::entropy, bool b=false, double tThresh=0.05);
void fit(double** data, long* result, long size);
long* fitThenPredict(double** trainData, long* trainResult, long trainSize, double** testData, long testSize);
void Rotate();
void Rotate(long stale);
long Test(double* data);
long Test(double* data, long result);
};
#endif

@ -11,6 +11,7 @@
std::random_device rd;
std::mt19937 g(rd());
struct minEval{
double value;
int* values;
@ -23,10 +24,11 @@ struct minEval{
};
struct DT{
int height;
long height;
long* featureId;
DT* left = nullptr;
DT* right = nullptr;
bool created;
// split info
bool terminate;
@ -47,19 +49,22 @@ struct DT{
double** dataRecord = nullptr;// Record the data
long* resultRecord = nullptr;// Record the result
long size = 0;// Size of the dataset
};
long seed = (long)clock();
long* Rands(long feature, long maxFeature){
//srand(seed++);
srand(seed);
long i;
long* ret = (long*) malloc(feature*sizeof(long));
for(i =0; i<feature; i++)ret[i] = i;
if(maxFeature==feature){
return ret;
}
std::shuffle(ret, &ret[feature], g);
std::shuffle(ret, ret+feature, g);
long* ret2 = (long*) malloc(maxFeature*sizeof(long));
for(i=0; i<maxFeature; i++)ret2[i] = ret[i];
for(i=0; i<maxFeature; i++){
ret2[i] = ret[i];
}
free(ret);
return ret2;
}
@ -67,9 +72,8 @@ double getRand(){
return (double) rand() / RAND_MAX;
}
void createTree(DT* t, long currentHeight, long height, long f, long maxF, long classes){
srand(seed);
void createNode(DT* t, long currentHeight, long f, long classes){
t->created = true;
long i;
t->count = (long***)malloc(f*sizeof(long**));
for(i=0; i<f; i++)t->count[i]=nullptr;
@ -77,8 +81,6 @@ void createTree(DT* t, long currentHeight, long height, long f, long maxF, long
for(i=0; i<f; i++)t->record[i]=nullptr;
t->max = (long*)malloc(f*sizeof(long));
t->max[0] = -1;
t->featureId = Rands(f, maxF);
//t->T = (long*)malloc(classes*sizeof(long));
t->sortedData = (double**) malloc(f*sizeof(double*));
for(i=0; i<f; i++)t->sortedData[i]=nullptr;
t->sortedResult = (long**) malloc(f*sizeof(long*));
@ -88,20 +90,18 @@ void createTree(DT* t, long currentHeight, long height, long f, long maxF, long
t->height = currentHeight;
t->feature = -1;
t->size = 0;
if(currentHeight>height){
t->right = nullptr;
t->left = nullptr;
return;
}
t->left = (DT*)malloc(sizeof(DT));
t->right = (DT*)malloc(sizeof(DT));
createTree(t->left, currentHeight+1, height, f, maxF, classes);
createTree(t->right, currentHeight+1, height, f, maxF, classes);
t->left->created = false;
t->right->created = false;
t->left->height = currentHeight+1;
t->right->height = currentHeight+1;
}
void stableTree(DT* t, long f){
long i, j;
if(not t->created)return;
for(i=0; i<f; i++){
if(t->count[i]==nullptr)continue;
for(j=0; j<t->max[i]; j++){
@ -116,7 +116,6 @@ void stableTree(DT* t, long f){
}
free(t->record);
free(t->max);
free(t->featureId);
for(i=0; i<f; i++){
if(t->sortedData[i]==nullptr)continue;
free(t->sortedData[i]);
@ -126,25 +125,28 @@ void stableTree(DT* t, long f){
if(t->sortedResult[i]==nullptr)continue;
free(t->sortedResult[i]);
}
free(t->sortedResult);
free(t->dataRecord);
free(t->resultRecord);
free(t->sortedResult);
if(t->right!=nullptr)stableTree(t->right, f);
if(t->left!=nullptr)stableTree(t->left, f);
}
void freeTree(DT* t){
if(t->left != nullptr)freeTree(t->left);
if(t->right != nullptr)freeTree(t->right);
if(t->created){
freeTree(t->left);
freeTree(t->right);
}
free(t);
}
DecisionTree::DecisionTree(int height, long f, int* sparse, double forget=0.1, long maxF=0, long noClasses=2, Evaluation e=Evaluation::gini, long r=-1, long rb=1){
DecisionTree::DecisionTree(long f, int* sparse, double rate, long maxF, long noClasses, Evaluation e){
evalue = e;
called = 0;
long i;
// Max tree height
maxHeight = height;
initialIR = rate;
increaseRate = rate;
isRF = false;
// Number of features
feature = f;
// If each feature is sparse or dense, 0 for dense, 1 for sparse, >2 for number of category
@ -157,40 +159,69 @@ DecisionTree::DecisionTree(int height, long f, int* sparse, double forget=0.1, l
DTree->feature = -1;
// The number of feature that is considered in each node
if(maxF>=f){
maxFeature = f;
maxF = f;
}else if(maxF<=0){
maxFeature = (long)round(sqrt(f));
}else{
maxFeature = maxF;
maxF = (long)round(sqrt(f));
}
forgetRate = std::min(1.0, forget);
retain = r;
createTree(DTree, 0, maxHeight, f, maxFeature, noClasses);
// Randomly generate the features
//DTree->featureId = Rands();
//DTree->sorted = (long**) malloc(f*sizeof(long*));
maxFeature = maxF;
forgetRate = -10.0;
retain = 0;
DTree->featureId = Rands(f, maxF);
DTree->terminate = true;
DTree->result = 0;
DTree->size = 0;
createNode(DTree, 0, f, noClasses);
// Number of classes of this dataset
Rebuild = rb;
roundNo = 0;
Rebuild = 2147483647;
roundNo = 64;
classes = std::max(noClasses, (long)2);
//DTree->T = (long*) malloc(noClasses*sizeof(long));
/*for(long i = 0; i<noClasses; i++){
DTree->T[i]=0;
}*/
// last Acc
lastAll = classes;
lastT = 1;
}
void DecisionTree::Stablelize(){
free(Sparse);
stableTree(DTree, feature);
long i, j;
DT* t = DTree;
long f = feature;
for(i=0; i<f; i++){
if(t->count[i]==nullptr)continue;
for(j=0; j<t->max[i]; j++){
free(t->count[i][j]);
}
free(t->count[i]);
}
free(t->count);
for(i=0; i<f; i++){
if(t->record[i]==nullptr)continue;
free(t->record[i]);
}
free(t->record);
free(t->max);
free(t->featureId);
for(i=0; i<f; i++){
if(t->sortedData[i]==nullptr)continue;
free(t->sortedData[i]);
}
free(t->sortedData);
for(i=0; i<f; i++){
if(t->sortedResult[i]==nullptr)continue;
free(t->sortedResult[i]);
}
free(t->sortedResult);
if(DTree->right!=nullptr)stableTree(t->right, feature);
if(DTree->left!=nullptr)stableTree(t->left, feature);
}
void DecisionTree::Free(){
free(DTree->dataRecord);
free(DTree->resultRecord);
freeTree(DTree);
}
minEval DecisionTree::incrementalMinGiniSparse(double** dataTotal, long* resultTotal, long sizeTotal, long sizeNew, DT* current, long col, long forgetSize, bool isRoot){
minEval DecisionTree::incrementalMinGiniSparse(double** dataTotal, long* resultTotal, long sizeTotal, long sizeNew, DT* current, long col, long forgetSize, double** forgottenData, long* forgottenClass){
long i, j;
if(isRoot){sizeNew=sizeTotal-forgetSize;}
long newD[sizeNew];
for(i=0; i<sizeNew; i++)newD[i]=i;
long T[classes];
@ -201,14 +232,28 @@ minEval DecisionTree::incrementalMinGiniSparse(double** dataTotal, long* resultT
long p1=0, p2=0;
double* oldData = current->sortedData[col];
long* oldResult = current->sortedResult[col];
long tmp2 = forgetSize;
long* allForget = (long*)malloc(sizeof(long)*classes);
for(i=0; i<classes; i++)allForget[i]=0;
for(i=0; i<sizeTotal; i++){
bool meet = false;
if(p1==sizeNew){
j = oldResult[p2];
if(allForget[j]!=forgottenClass[j]){
if(oldData[p2]==forgottenData[j][allForget[j]]){
allForget[j]++;
i--;
meet = true;
}
}
if(not meet){
newSortedData[i] = oldData[p2];
newSortedResult[i] = oldResult[p2];
T[newSortedResult[i]]++;
}
p2++;
}
else if(p2==sizeTotal-sizeNew){
else if(p2==sizeTotal-sizeNew+forgetSize){
newSortedData[i] = dataTotal[newD[p1]][col];
newSortedResult[i] = resultTotal[newD[p1]];
T[newSortedResult[i]]++;
@ -220,17 +265,27 @@ minEval DecisionTree::incrementalMinGiniSparse(double** dataTotal, long* resultT
T[newSortedResult[i]]++;
p1++;
}else{
j = oldResult[p2];
if(allForget[j]!=forgottenClass[j]){
if(oldData[p2]==forgottenData[j][allForget[j]]){
allForget[j]++;
i--;
meet = true;
}
}
if(not meet){
newSortedData[i] = oldData[p2];
newSortedResult[i] = oldResult[p2];
T[newSortedResult[i]]++;
}
p2++;
}
}
free(allForget);
current->sortedData[col] = newSortedData;
current->sortedResult[col] = newSortedResult;
free(oldData);
free(oldResult);
minEval ret;
if(evalue == Evaluation::gini){
ret = giniSparseIncremental(sizeTotal, classes, newSortedData, newSortedResult, T);
@ -240,28 +295,43 @@ minEval DecisionTree::incrementalMinGiniSparse(double** dataTotal, long* resultT
ret.values = nullptr;
return ret;
}
minEval DecisionTree::incrementalMinGiniDense(double** data, long* result, long size, long col, long*** count, double** record, long* max, long newSize, long forgetSize, bool isRoot){
minEval DecisionTree::incrementalMinGiniDense(double** data, long* result, long size, long col, long*** count, double** record, long* max, long newSize, long forgetSize, double** forgottenData, long* forgottenClass){
// newSize is before forget
long low = 0;
if(isRoot)size=newSize-forgetSize;
long i, j, k;
//if(isRoot)
long i, j, k, tmp;
long newMax = 0;
long maxLocal = max[col];
long **newCount=(long**)malloc(size*sizeof(long*));
for(i=0;i<size;i++){
newCount[i] = (long*)malloc((classes+1)*sizeof(long));
for(j=0;j<= classes;j++)newCount[i][j]=0;
}
double newRecord[size];
bool find;
long tmp3 = newSize-size;
long tmp4 = forgetSize;
// find total count for each class
long T[classes];
for(i=0;i<classes;i++)T[i]=0;
long tmp2=0;
long* allForget = new long[classes];
for(i=0;i<classes;i++){
T[i]=0;
allForget[i]=0;
}
// forget
for(i=0;i<max[col];i++){
for(j=0;j<classes;j++){
if(isRoot)count[col][i][j]=0;
else if(T[j]<count[col][i][j])T[j]=count[col][i][j];
tmp = count[col][i][j];
tmp2+=tmp;
for(k=0; k<tmp; k++){
if(allForget[j]==forgottenClass[j])break;
if(record[col][i]==forgottenData[j][allForget[j]]){
forgetSize--;
count[col][i][j]--;
count[col][i][classes]--;
allForget[j]++;
}else{
break;
}
}
T[j]+=count[col][i][j];
}
}
@ -274,9 +344,6 @@ minEval DecisionTree::incrementalMinGiniDense(double** data, long* result, long
count[col][j][result[i]]++;
count[col][j][classes] ++;
find = true;
}else if(data[i][col]<record[col][j]){
count[col][j][result[i]]++;
count[col][j][classes] ++;
}
}
for(j=0;j<newMax;j++){
@ -284,65 +351,64 @@ minEval DecisionTree::incrementalMinGiniDense(double** data, long* result, long
newCount[j][result[i]]++;
newCount[j][classes] ++;
find = true;
} else if(data[i][col]<newRecord[j]){
newCount[j][result[i]]++;
newCount[j][classes] ++;
}
}
if(not find){
newRecord[newMax] = data[i][col];
double currentMinMax = -1*DBL_MAX;
for(j=0;j<max[col];j++){
if(record[col][j]<newRecord[newMax] and record[col][j]>currentMinMax){
currentMinMax = record[col][j];
for(k=0;k<=classes;k++)newCount[newMax][k]=count[col][j][k];
}
}
for(j=0;j<newMax;j++){
if(newRecord[j]<newRecord[newMax] and currentMinMax<newRecord[j]){
currentMinMax = newRecord[j];
for(k=0;k<=classes;k++)newCount[newMax][k]=newCount[j][k];
}
}
if(currentMinMax== -1*DBL_MAX){
for(k=0;k<=classes;k++)newCount[newMax][k]=0;
}
newCount[newMax] = (long*)malloc((classes+1)*sizeof(long));
for(j=0;j<= classes;j++)newCount[newMax][j]=0;
newCount[newMax][result[i]]++;
newCount[newMax][classes]++;
newRecord[newMax] = data[i][col];
newMax++;
}
}
// Updata new count and record
long* d;
if(newMax>0){
d = (long*)malloc(sizeof(long)*newMax);
for(i=0;i<newMax;i++)d[i]=i;
std::sort(d, d+newMax, [&newRecord](long l, long r){return newRecord[l]<newRecord[r];});
max[col]+=newMax;
long** updateCount = (long**)malloc(max[col]*sizeof(long*));
double* updateRecord = (double*)malloc(max[col]*sizeof(double));
j = 0;
k = 0;
for(i=0; i<max[col]; i++){
if(i>=newMax){
updateCount[i] = count[col][i-newMax];
updateRecord[i] = record[col][i-newMax];
if(k==max[col]-newMax){
updateCount[i] = newCount[j];
updateRecord[i] = newRecord[j];
j++;
}
else if(j==newMax){
updateCount[i] = count[col][k];
updateRecord[i] = record[col][k];
k++;
}
else if(newRecord[j]>record[col][k]){
updateCount[i] = count[col][k];
updateRecord[i] = record[col][k];
k++;
}
else{
updateCount[i] = newCount[i];
updateRecord[i] = newRecord[i];
updateCount[i] = newCount[j];
updateRecord[i] = newRecord[j];
j++;
}
}
free(count[col]);
free(record[col]);
count[col]=updateCount;
record[col]=updateRecord;
}
for(i=newMax; i<size; i++){
free(newCount[i]);
free(d);
}
free(newCount);
//calculate gini
minEval ret;
if(evalue==Evaluation::gini){
ret = giniDenseIncremental(max[col], record[col], count[col], classes, newSize, T);
ret = giniDense(max[col], newSize, classes, count[col], d, record[col], T);
}else if(evalue==Evaluation::entropy or evalue==Evaluation::logLoss){
ret = entropyDenseIncremental(max[col], record[col], count[col], classes, newSize, T);
ret = entropyDense(max[col], newSize, classes, count[col], d, record[col], T);
}
ret.values = nullptr;
return ret;
@ -353,7 +419,6 @@ minEval DecisionTree::findMinGiniSparse(double** data, long* result, long* total
long* d = (long*)malloc(size*sizeof(long));
for(i=0; i<size; i++)d[i]=i;
std::sort(d, d+size, [&data, col](long l, long r){return data[l][col]<data[r][col];});
minEval ret;
if(evalue == Evaluation::gini){
ret = giniSparse(data, result, d, size, col, classes, totalT);
@ -378,7 +443,6 @@ minEval DecisionTree::findMinGiniDense(double** data, long* result, long* totalT
long low = 0;
long i, j, k, max=0;
long** count = (long**)malloc(size*sizeof(long*));
// size2 and count2 are after forget
double* record = (double*)malloc(size*sizeof(double));
bool find;
for(i=0;i<size;i++){
@ -402,20 +466,24 @@ minEval DecisionTree::findMinGiniDense(double** data, long* result, long* totalT
max++;
}
}
long d[max];
for(i=0;i<max;i++){
d[i] = i;
}
std::sort(d, d+max, [&record](long l, long r){return record[l]<record[r];});
long** rem = (long**)malloc(max*sizeof(long*));
double* record2 = (double*)malloc(max*sizeof(double));
for(i=0;i<max;i++){
rem[i] = count[i];
record2[i] = record[i];
rem[i] = count[d[i]];
record2[i] = record[d[i]];
}
free(count);
free(record);
long d[max];
for(i=0;i<max;i++){
d[i] = i;
}
std::sort(d, d+max, [&record2](long l, long r){return record2[l]<record2[r];});
minEval ret;
if(evalue == Evaluation::gini){
ret = giniDense(max, size, classes, rem, d, record2, totalT);
@ -429,23 +497,126 @@ minEval DecisionTree::findMinGiniDense(double** data, long* result, long* totalT
return ret;
}
double xxx;
void DecisionTree::fit(double** data, long* result, long size){
roundNo++;
double isUp = -1.0;
long localT = 0;
long localAll = 0;
if(DTree->size==0){
retain = size;
maxHeight = (long)log2((double)retain);
maxHeight = std::max(maxHeight, (long)1);
Update(data, result, size, DTree);
}else{
IncrementalUpdate(data, result, size, DTree);
if(forgetRate<=0){
for(long j=0; j<size; j++){
if(Test(data[j], DTree)==result[j])localT++;
localAll++;
}
double guessAcc;
guessAcc = 1.0/classes;
if(forgetRate==0.0){
double lastSm = (double)lastT/lastAll;
double localSm = (double)localT/localAll;
/*long guesses[classes], i;
for(i=0; i<classes; i++)guesses[i]=0;
for(i=0; i<DTree->size; i++){
guesses[DTree->resultRecord[i]]++;
}
for(i=0; i<size; i++){
guessAcc += (double)guesses[result[i]]/DTree->size/size;
}*/
if(localSm <= guessAcc){
//if(localSm <= 1.0/classes){
lastT = localT;
lastAll = localAll;
retain = size;
//increaseRate = 1.0-localSm;
}
else if(lastSm <= guessAcc){
//else if(lastSm <= 1.0/classes){
lastT = localT;
lastAll = localAll;
//forgetRate=-5.0;
retain += size;
//increaseRate -= localSm;
//increaseRate = initialIR;
//increaseRate -= localSm;
//increaseRate /= (double)localSm-1.0/classes;
}
else if(lastSm == localSm){
lastT += localT;
lastAll += localAll;
retain+=(long)round(increaseRate*size);
//increaseRate*=increaseRate;
//retain = (long)((double)retain*isUp+0.25*size);
}
else{
/*double lastSd = sqrt(pow((1.0-lastSm),2)*lastT+pow(lastSm,2)*(lastAll-lastT)/(lastAll-1));
double localSd = sqrt(pow((1.0-localSm),2)*localT+pow(localSm,2)*(localAll-localT)/(localAll-1));
double v = lastAll+localAll-2;
double sp = sqrt(((lastAll-1) * lastSd * lastSd + (localAll-1) * localSd * localSd) / v);
double q;
//double t=lastSm-localSm;
if(sp==0)q=1.0;
else if(lastAll+lastAll<2000){
q = abs(lastSm-localSm);
}
/*
if(Rebuild and called==10){
called = 0;
Rebuild = false;
}else if(Rebuild){
called = 11;
else{
double t = t/(sp*sqrt(1.0/lastAll+1.0/localAll));
boost::math::students_t dist(v);
double c = cdf(dist, t);
q = cdf(complement(dist, fabs(t)));
}*/
isUp = ((double)localSm-guessAcc)/((double)lastSm-guessAcc);
//isUp = ((double)localSm-1.0/classes)/((double)lastSm-1.0/classes);
increaseRate = increaseRate/isUp;
//increaseRate += increaseRate*factor;
if(isUp>=1.0)isUp=pow(isUp, 2);
else{
isUp=pow(isUp, 3-isUp);
}
retain = std::min((long)round(retain*isUp+increaseRate*size), retain+size);
//double factor = ((lastSm-localSm)/localSm)*abs((lastSm-localSm)/localSm)*increaseRate;
//retain += std::min((long)round(factor*retain+increaseRate*size), size);
lastT = localT;
lastAll = localAll;
}
//printf(" %f, %f, %f\n", increaseRate, localSm, lastSm);
}else{
called++;
long i;
retain = DTree->size+size;
/*double guessAcc=0.0;
long guesses[classes];
for(i=0; i<classes; i++)guesses[i]=0;
for(i=0; i<DTree->size; i++){
guesses[DTree->resultRecord[i]]++;
}
for(i=0; i<size; i++){
guessAcc += (double)guesses[result[i]]/DTree->size/size;
}*/
while(retain>=roundNo){
if((double)localT/localAll>guessAcc){
forgetRate+=5.0;
}
roundNo*=2;
}
if((double)localT/localAll<=guessAcc){
forgetRate=-10.0;
}
if(forgetRate>=0){
forgetRate=0.0;
}
lastT = localT;
lastAll = localAll;
}
}
//if(increaseRate>initialIR)increaseRate=initialIR;
//printf("%f\n", increaseRate);
if(retain<size)retain=size;
maxHeight = (long)log2((double)retain);
maxHeight = std::max(maxHeight, (long)1);
IncrementalUpdate(data, result, size, DTree);
}
}
long* DecisionTree::fitThenPredict(double** trainData, long* trainResult, long trainSize, double** testData, long testSize){
@ -461,12 +632,28 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT*
long i, j;
long low = 0;
long forgetSize=0;
if(retain>0 and current->size+size>retain) forgetSize = std::min(current->size+size - retain, current->size);
else if(retain<0) forgetSize = (long)current->size*forgetRate;
long* index = new long[current->size];
long* index;
bool forgetOld = false;
index = (long*)malloc(sizeof(long)*current->size);
if(current->size+size>retain and current->height==0) {
forgetSize = std::min(current->size+size - retain, current->size);
}
if(forgetSize==current->size){
Update(data, result, size, current);
return;
}
double** dataNew;
long* resultNew;
double*** forgottenData = (double***)malloc(feature*sizeof(double**));
long* forgottenClass = (long*)malloc(classes*sizeof(long));
for(i=0;i<classes;i++)forgottenClass[i]=0;
if(current->height == 0){
for(i=0; i<feature; i++){
forgottenData[i] = (double**)malloc(classes*sizeof(double*));
for(j=0; j<classes; j++){
forgottenData[i][j] = (double*)malloc(forgetSize*sizeof(double));
}
}
dataNew = (double**)malloc((size+current->size-forgetSize)*sizeof(double*));
resultNew = (long*)malloc((size+current->size-forgetSize)*sizeof(long));
for(i=0;i<size;i++){
@ -476,47 +663,110 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT*
for(i=0; i<current->size; i++){
index[i] = i;
}
std::shuffle(index, index+current->size, g);
if(isRF)std::shuffle(index, index+current->size, g);
long x = 0;
for(i=0;i<current->size;i++){
if(i>=current->size-forgetSize){
current->dataRecord[index[i]][feature-1] = DBL_MAX;
for(j=0; j<feature; j++){
forgottenData[j][current->resultRecord[index[i]]][forgottenClass[current->resultRecord[index[i]]]]=current->dataRecord[index[i]][j];
}
forgottenClass[current->resultRecord[index[i]]]++;
current->dataRecord[index[i]][feature] = DBL_MAX;
}else{
dataNew[i+size] = current->dataRecord[index[i]];
resultNew[i+size] = current->resultRecord[index[i]];
}
}
for(i=0; i<feature; i++){
for(j=0; j<classes; j++){
std::sort(forgottenData[i][j], forgottenData[i][j]+forgottenClass[j]);
}
}
}else{
forgetSize = 0;
dataNew = (double**)malloc((size+current->size)*sizeof(double*));
resultNew = (long*)malloc((size+current->size)*sizeof(long));
long xxx[current->size];
for(i=0;i<size;i++){
dataNew[i] = data[i];
resultNew[i] = result[i];
}
for(i=0;i<current->size;i++){
if(current->dataRecord[i][feature-1] == DBL_MAX){
if(current->dataRecord[i][feature] == DBL_MAX){
xxx[forgetSize]=i;
forgetSize++;
continue;
forgottenClass[current->resultRecord[i]]++;
}else{
dataNew[i+size-forgetSize] = current->dataRecord[i];
resultNew[i+size-forgetSize] = current->resultRecord[i];
}
}
if(forgetSize==current->size){
free(forgottenData);
free(forgottenClass);
if(size!=0){
free(dataNew);
free(resultNew);
Update(data, result, size, current);
}else{
// if a node have no new data and forget all old data, just keep old data
return;
}
return;
}
for(i=0; i<feature; i++){
forgottenData[i] = (double**)malloc(classes*sizeof(double*));
for(j=0; j<classes; j++){
forgottenData[i][j] = (double*)malloc(std::max(forgottenClass[j], (long)1)*sizeof(double));
}
}
long* k = (long*)malloc(sizeof(long)*classes);
for(i=0; i<classes; i++)k[i]=0;
for(i=0;i<forgetSize;i++){
long tmp = xxx[i];
for(j=0; j<feature; j++){
forgottenData[j][current->resultRecord[tmp]][k[current->resultRecord[tmp]]]=current->dataRecord[tmp][j];
}
k[current->resultRecord[tmp]]++;
}
free(k);
for(i=0; i<feature; i++){
for(j=0; j<classes; j++){
std::sort(forgottenData[i][j], forgottenData[i][j]+forgottenClass[j]);
}
}
}
free(data);
free(result);
current->size -= forgetSize;
current->size += size;
// end condition
if(current->terminate or roundNo%Rebuild==0){
if(current->terminate or current->height==maxHeight or current->size==1){
for(i=0;i<feature;i++){
for(j=0; j<classes; j++){
free(forgottenData[i][j]);
}
free(forgottenData[i]);
}
free(forgottenData);
free(forgottenClass);
if(current->height == 0){
for(i=0; i<forgetSize; i++){
free(current->dataRecord[index[current->size-size+i]]);
}
}
delete(index);
free(index);
Update(dataNew, resultNew, current->size, current);
return;
}else if(size==0){
for(i=0;i<feature;i++){
for(j=0; j<classes; j++){
free(forgottenData[i][j]);
}
free(forgottenData[i]);
}
free(forgottenData);
free(forgottenClass);
Update(dataNew, resultNew, current->size, current);
return;
}
@ -525,24 +775,47 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT*
long cFeature;
cMin.eval = DBL_MAX;
cMin.values = nullptr;
// TODO
long T[classes];
double HY=0;
for(i=0;i<classes;i++){
T[i] = 0;
}
for(i=0;i<size;i++){
j = resultNew[i];
T[j]++;
}
for(i=0;i<classes;i++){
if(evalue == Evaluation::entropy){
if(T[i]!=0)HY -= ((double)T[i]/size)*log2((double)T[i]/size);
}else{
HY += pow(((double)T[i]/size), 2);
}
}
for(i=0;i<maxFeature; i++){
if(Sparse[current->featureId[i]]==1){
c = incrementalMinGiniSparse(dataNew, resultNew, current->size+forgetSize, size, current, current->featureId[i], forgetSize, false);
long col = DTree->featureId[i];
if(Sparse[col]==1){
c = incrementalMinGiniSparse(dataNew, resultNew, current->size, size, current, col, forgetSize, forgottenData[col], forgottenClass);
}
else if(Sparse[current->featureId[i]]==0){
c = incrementalMinGiniDense(dataNew, resultNew, size, current->featureId[i], current->count, current->record, current->max, current->size+forgetSize, forgetSize, false);
else if(Sparse[col]==0){
c = incrementalMinGiniDense(dataNew, resultNew, size, col, current->count, current->record, current->max, current->size, forgetSize, forgottenData[col], forgottenClass);
}else{
//c = incrementalMinGiniCategorical();
}
if(c.eval<cMin.eval){
cMin.eval = c.eval;
cMin.value = c.value;
if(cMin.values != nullptr)free(cMin.values);
cMin.values = c.values;
cFeature = current->featureId[i];
}else if(c.values!=nullptr)free(c.values);
cFeature = col;
}
}
for(i=0;i<feature;i++){
for(j=0; j<classes; j++){
free(forgottenData[i][j]);
}
free(forgottenData[i]);
}
free(forgottenData);
free(forgottenClass);
if(cMin.eval==DBL_MAX){
current->terminate = true;
long t[classes];
@ -550,27 +823,23 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT*
t[i]=0;
}
for(i=low;i<low+size;i++){
t[result[i]]++;
t[resultNew[i]]++;
}
if(cMin.values!=nullptr)free(cMin.values);
current->result = std::distance(t, std::max_element(t, t+classes));
free(index);
free(current->dataRecord);
free(current->resultRecord);
current->dataRecord = dataNew;
current->resultRecord = resultNew;
return;
}
//diverse data
long ptL=0, ptR=0;
double* t;
long currentSize = current->size;
//TODO:Discrete
// Same diverse point as last time
if(current->dpoint==cMin.value and current->feature==cFeature){
long xxx = current->left->size;
/*for(i=0; i<size; i++){
if(dataNew[i][current->feature]<=current->dpoint){
ptL++;
}else{
ptR++;
}
}*/
ptL = size;
ptR = size;
long* resultL = (long*)malloc((ptL)*sizeof(long));
@ -598,7 +867,7 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT*
free(current->dataRecord[index[current->size-size+i]]);
}
}
delete(index);
free(index);
free(current->dataRecord);
free(current->resultRecord);
current->dataRecord = dataNew;
@ -636,21 +905,23 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT*
Update(dataL, resultL, ptL, current->left);
Update(dataR, resultR, ptR, current->right);
// TODO: free memeory
if(current->height == 0){
for(i=0; i<forgetSize; i++){
free(current->dataRecord[index[current->size-size+i]]);
}
}
delete(index);
free(index);
free(current->dataRecord);
free(current->resultRecord);
current->dataRecord = dataNew;
current->resultRecord = resultNew;
}
void DecisionTree::Update(double** data, long* result, long size, DT* current){
if(not current->created)createNode(current, current->height, feature, classes);
long low = 0;
long i, j;
double HY = 0;
// end condition
if(current->dataRecord!=nullptr)free(current->dataRecord);
current->dataRecord = data;
@ -663,7 +934,7 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){
for(i=0;i<classes;i++){
t[i]=0;
}
for(i=low;i<low+size;i++){
for(i=0;i<size;i++){
t[result[i]]++;
}
current->result = std::distance(t, std::max_element(t, t+classes));
@ -683,19 +954,25 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){
current->result = i;
return;
}
if(evalue == Evaluation::entropy){
if(T[i]!=0)HY -= ((double)T[i]/size)*log2((double)T[i]/size);
}else{
HY += pow(((double)T[i]/size), 2);
}
}
// find min Evaluation
minEval c, cMin;
long cFeature, oldMax, col, left=0;
cMin.eval = DBL_MAX;
cMin.values = nullptr;
//TODO
cFeature = -1;
//TODO: categorical
for(i=0;i<maxFeature; i++){
col = current->featureId[i];
if(Sparse[current->featureId[i]]==1){
col = DTree->featureId[i];
if(Sparse[col]==1){
c = findMinGiniSparse(data, result, T, size, col, current);
}
else if(Sparse[current->featureId[i]]==0){
else if(Sparse[col]==0){
c = findMinGiniDense(data, result, T, size, col);
if(current->count[col]!=nullptr){
for(j=0; j<current->max[col]; j++){
@ -715,32 +992,37 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){
if(cMin.values!=nullptr)free(cMin.values);
cMin.values = c.values;
cMin.value = c.value;
cFeature = current->featureId[i];
cFeature = col;
left = c.left;
}else if(c.values!=nullptr){
free(c.values);
}
}
if(cMin.eval == DBL_MAX){
current->terminate = true;
long max = 0;
long maxs[classes];
long count = 0;
for(i=1;i<classes;i++){
if(T[max]<T[i])max=i;
if(T[max]<T[i]){
max=i;
}
}
if(cMin.values!=nullptr)free(cMin.values);
current->result = max;
return;
}
//printf(" %f\n", HY-cMin.eval);
//diverse data
current->terminate = false;
current->feature = cFeature;
current->dpoint = cMin.value;
long ptL=0, ptR=0;
//TODO:Discrete
long* resultL = new long[left];
long* resultR = new long[size-left];
double** dataL = new double*[left];
double** dataR = new double*[size-left];
//TODO: categorical
long* resultL = new long[size];
long* resultR = new long[size];
double** dataL = new double*[size];
double** dataR = new double*[size];
for(i=low; i<low+size; i++){
if(data[i][current->feature]<=current->dpoint){
dataL[ptL] = data[i];
@ -757,14 +1039,12 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){
}
long DecisionTree::Test(double* data, DT* root){
if(root->terminate)return root->result;
if(root->terminate or root->height == maxHeight)return root->result;
if(data[root->feature]<=root->dpoint)return Test(data, root->left);
return Test(data, root->right);
}
void DecisionTree::print(DT* root){
int x;
//std::cin>>x;
if(root->terminate){
printf("%ld", root->result);
return;

@ -1,38 +1,46 @@
#include "DecisionTree.h"
#include "aquery.h"
// __AQ_NO_SESSION__
#include "../server/table.h"
#include "aquery.h"
DecisionTree *dt = nullptr;
__AQEXPORT__(bool) newtree(int height, long f, ColRef<int> sparse, double forget, long maxf, long noclasses, Evaluation e, long r, long rb){
if(sparse.size!=f)return 0;
__AQEXPORT__(bool)
newtree(int height, long f, ColRef<int> sparse, double forget, long maxf, long noclasses, Evaluation e, long r, long rb)
{
if (sparse.size != f)
return false;
int *issparse = (int *)malloc(f * sizeof(int));
for(long i=0; i<f; i++){
for (long i = 0; i < f; i++)
issparse[i] = sparse.container[i];
}
if(maxf<0)maxf=f;
dt = new DecisionTree(height, f, issparse, forget, maxf, noclasses, e, r, rb);
return 1;
}
__AQEXPORT__(bool) fit(ColRef<ColRef<double>> X, ColRef<int> y){
if(X.size != y.size)return 0;
double** data = (double**)malloc(X.size*sizeof(double*));
long* result = (long*)malloc(y.size*sizeof(long));
for(long i=0; i<X.size; i++){
data[i] = X.container[i].container;
result[i] = y.container[i];
}
data[pt] = (double*)malloc(X.size*sizeof(double));
for(j=0; j<X.size; j++){
data[pt][j]=X.container[j];
}
result[pt] = y;
pt ++;
return 1;
if (maxf < 0)
maxf = f;
dt = new DecisionTree(f, issparse, forget, maxf, noclasses, e);
return true;
}
__AQEXPORT__(bool) fit(vector_type<vector_type<double>> v, vector_type<long> res){
// size_t pt = 0;
// __AQEXPORT__(bool) fit(ColRef<ColRef<double>> X, ColRef<int> y){
// if(X.size != y.size)return 0;
// double** data = (double**)malloc(X.size*sizeof(double*));
// long* result = (long*)malloc(y.size*sizeof(long));
// for(long i=0; i<X.size; i++){
// data[i] = X.container[i].container;
// result[i] = y.container[i];
// }
// data[pt] = (double*)malloc(X.size*sizeof(double));
// for(uint32_t j=0; j<X.size; j++){
// data[pt][j]=X.container[j];
// }
// result[pt] = y;
// pt ++;
// return 1;
// }
__AQEXPORT__(bool)
fit(vector_type<vector_type<double>> v, vector_type<long> res)
{
double **data = (double **)malloc(v.size * sizeof(double *));
for (int i = 0; i < v.size; ++i)
data[i] = v.container[i].container;
@ -40,24 +48,18 @@ __AQEXPORT__(bool) fit(vector_type<vector_type<double>> v, vector_type<long> res
return true;
}
__AQEXPORT__(vectortype_cstorage) predict(vector_type<vector_type<double>> v){
__AQEXPORT__(vectortype_cstorage)
predict(vector_type<vector_type<double>> v)
{
int *result = (int *)malloc(v.size * sizeof(int));
for(long i=0; i<v.size; i++){
for (long i = 0; i < v.size; i++)
result[i] = dt->Test(v.container[i].container, dt->DTree);
//printf("%d ", result[i]);
}
auto container = (vector_type<int> *)malloc(sizeof(vector_type<int>));
container->size = v.size;
container->capacity = 0;
container->container = result;
// container->out(10);
// ColRef<vector_type<int>>* col = (ColRef<vector_type<int>>*)malloc(sizeof(ColRef<vector_type<int>>));
auto ret = vectortype_cstorage{.container = container, .size = 1, .capacity = 0};
// col->initfrom(ret, "sibal");
// print(*col);
return ret;
//return true;
}

@ -31,12 +31,14 @@ void print<__int128_t>(const __int128_t& v, const char* delimiter){
s[40] = 0;
std::cout<< get_int128str(v, s+40)<< delimiter;
}
template <>
void print<__uint128_t>(const __uint128_t&v, const char* delimiter){
char s[41];
s[40] = 0;
std::cout<< get_uint128str(v, s+40) << delimiter;
}
std::ostream& operator<<(std::ostream& os, __int128 & v)
{
print(v);
@ -76,6 +78,7 @@ char* intToString(T val, char* buf){
return buf;
}
void skip(const char*& buf){
while(*buf && (*buf >'9' || *buf < '0')) buf++;
}
@ -264,7 +267,7 @@ std::string base62uuid(int l) {
static uniform_int_distribution<uint64_t> u(0x10000, 0xfffff);
uint64_t uuid = (u(engine) << 32ull) +
(std::chrono::system_clock::now().time_since_epoch().count() & 0xffffffff);
//printf("%llu\n", uuid);
string ret;
while (uuid && l-- >= 0) {
ret = string("") + base62alp[uuid % 62] + ret;
@ -525,8 +528,8 @@ void ScratchSpace::cleanup(){
static_cast<vector_type<void*>*>(temp_memory_fractions);
if (vec_tmpmem_fractions->size) {
for(auto& mem : *vec_tmpmem_fractions){
free(mem);
//GC::gc_handle->reg(mem);
//free(mem);
GC::gc_handle->reg(mem);
}
vec_tmpmem_fractions->clear();
}

@ -73,16 +73,17 @@ struct StoredProcedure{
void **__rt_loaded_modules;
};
struct Context {
typedef int (*printf_type) (const char *format, ...);
void* module_function_maps = 0;
void* module_function_maps = nullptr;
Config* cfg;
int n_buffers, *sz_bufs;
void **buffers;
void* alt_server = 0;
void* alt_server = nullptr;
Log_level log_level = LOG_INFO;
Session current;
@ -115,6 +116,12 @@ struct Context{
};
struct StoredProcedurePayload {
StoredProcedure *p;
Context* cxt;
};
int execTriggerPayload(void*);
#ifdef _WIN32
#define __DLLEXPORT__ __declspec(dllexport) __stdcall
@ -169,4 +176,79 @@ inline void AQ_ZeroMemory(_This_Struct& __val) {
memset(&__val, 0, sizeof(_This_Struct));
}
#ifdef __USE_STD_SEMAPHORE__
#include <semaphore>
class A_Semaphore {
private:
std::binary_semaphore native_handle;
public:
A_Semaphore(bool v = false) {
native_handle = std::binary_semaphore(v);
}
void acquire() {
native_handle.acquire();
}
void release() {
native_handle.release();
}
~A_Semaphore() { }
};
#else
#ifdef _WIN32
class A_Semaphore {
private:
void* native_handle;
public:
A_Semaphore(bool);
void acquire();
void release();
~A_Semaphore();
};
#else
#ifdef __APPLE__
#include <dispatch/dispatch.h>
class A_Semaphore {
private:
dispatch_semaphore_t native_handle;
public:
explicit A_Semaphore(bool v = false) {
native_handle = dispatch_semaphore_create(v);
}
void acquire() {
// puts("acquire");
dispatch_semaphore_wait(native_handle, DISPATCH_TIME_FOREVER);
}
void release() {
// puts("release");
dispatch_semaphore_signal(native_handle);
}
~A_Semaphore() {
}
};
#else
#include <semaphore.h>
class A_Semaphore {
private:
sem_t native_handle;
public:
A_Semaphore(bool v = false) {
sem_init(&native_handle, v, 1);
}
void acquire() {
sem_wait(&native_handle);
}
void release() {
sem_post(&native_handle);
}
~A_Semaphore() {
sem_destroy(&native_handle);
}
};
#endif // __APPLE__
#endif // _WIN32
#endif //__USE_STD_SEMAPHORE__
void print_monetdb_results(void* _srv, const char* sep, const char* end, uint32_t limit);
#endif

@ -5,9 +5,19 @@
#include <string>
#include "monetdb_conn.h"
#include "monetdbe.h"
#include "table.h"
#include <thread>
#ifdef _WIN32
#include "winhelper.h"
#else
#include <dlfcn.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <atomic>
#endif // _WIN32
#undef ERROR
#undef static_assert
@ -215,3 +225,46 @@ bool Server::havehge() {
return false;
#endif
}
void ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){
auto server = static_cast<Server*>(cxt->alt_server);
void* handle = nullptr;
uint32_t procedure_module_cursor = 0;
for(uint32_t i = 0; i < p->cnt; ++i) {
switch(p->queries[i][0]){
case 'Q': {
server->exec(p->queries[i]);
}
break;
case 'P': {
auto c = code_snippet(dlsym(handle, p->queries[i]+1));
c(cxt);
}
break;
case 'N': {
if(procedure_module_cursor < p->postproc_modules)
handle = p->__rt_loaded_modules[procedure_module_cursor++];
}
break;
case 'O': {
uint32_t limit;
memcpy(&limit, p->queries[i] + 1, sizeof(uint32_t));
if (limit == 0)
continue;
print_monetdb_results(server, " ", "\n", limit);
}
break;
default:
printf("Warning Q%u: unrecognized command %c.\n",
i, p->queries[i][0]);
}
}
}
int execTriggerPayload(void* args) {
auto spp = (StoredProcedurePayload*)(args);
ExecuteStoredProcedureEx(spp->p, spp->cxt);
delete spp;
return 0;
}

@ -24,7 +24,7 @@ struct Server{
static bool havehge();
void test(const char*);
void print_results(const char* sep = " ", const char* end = "\n");
friend void print_monetdb_results(Server* srv, const char* sep, const char* end, int limit);
friend void print_monetdb_results(void* _srv, const char* sep, const char* end, int limit);
~Server();
};

@ -0,0 +1,93 @@
#include "monetdbe.h"
#include <stdint.h>
#include "mal_client.h"
#include "sql_mvc.h"
#include "sql_semantic.h"
#include "mal_exception.h"
typedef struct column_storage {
int refcnt;
int bid;
int ebid; /* extra bid */
int uibid; /* bat with positions of updates */
int uvbid; /* bat with values of updates */
storage_type st; /* ST_DEFAULT, ST_DICT, ST_FOR */
bool cleared;
bool merged; /* only merge changes once */
size_t ucnt; /* number of updates */
ulng ts; /* version timestamp */
} column_storage;
typedef struct segment {
BUN start;
BUN end;
bool deleted; /* we need to keep a dense segment set, 0 - end of last segemnt,
some segments maybe deleted */
ulng ts; /* timestamp on this segment, ie tid of some active transaction or commit time of append/delete or
rollback time, ie ready for reuse */
ulng oldts; /* keep previous ts, for rollbacks */
struct segment *next; /* usualy one should be enough */
struct segment *prev; /* used in destruction list */
} segment;
/* container structure to allow sharing this structure */
typedef struct segments {
sql_ref r;
struct segment *h;
struct segment *t;
} segments;
typedef struct storage {
column_storage cs; /* storage on disk */
segments *segs; /* local used segements */
struct storage *next;
} storage;
typedef struct {
char language; /* 'S' or 's' or 'X' */
char depth; /* depth >= 1 means no output for trans/schema statements */
int remote; /* counter to make remote function names unique */
mvc *mvc;
char others[];
} backend;
typedef struct {
Client c;
char *msg;
monetdbe_data_blob blob_null;
monetdbe_data_date date_null;
monetdbe_data_time time_null;
monetdbe_data_timestamp timestamp_null;
str mid;
} monetdbe_database_internal;
size_t
monetdbe_get_size(monetdbe_database dbhdl, const char* schema_name, const char *table_name)
{
monetdbe_database_internal* hdl = (monetdbe_database_internal*)dbhdl;
backend* be = ((backend *)(((monetdbe_database_internal*)dbhdl)->c->sqlcontext));
mvc *m = be->mvc;
mvc_trans(m);
sql_table *t = find_table_or_view_on_scope(m, NULL, schema_name, table_name, "CATALOG", false);
sql_column *col = ol_first_node(t->columns)->data;
sqlstore* store = m->store;
size_t sz = store->storage_api.count_col(m->session->tr, col, QUICK);
mvc_cancel_session(m);
return sz;
}
void*
monetdbe_get_col(monetdbe_database dbhdl, const char* schema_name, const char *table_name, uint32_t col_id) {
monetdbe_database_internal* hdl = (monetdbe_database_internal*)dbhdl;
backend* be = ((backend *)(((monetdbe_database_internal*)dbhdl)->c->sqlcontext));
mvc *m = be->mvc;
mvc_trans(m);
sql_table *t = find_table_or_view_on_scope(m, NULL, schema_name, table_name, "CATALOG", false);
sql_column *col = ol_fetch(t->columns, col_id);
sqlstore* store = m->store;
BAT *b = store->storage_api.bind_col(m->session->tr, col, RDONLY);
BATiter iter = bat_iterator(b);
mvc_cancel_session(m);
return iter.base;
}

@ -20,10 +20,6 @@
#include <sys/mman.h>
#include <atomic>
// fast numeric to string conversion
#include "jeaiii_to_text.h"
#include "dragonbox/dragonbox_to_chars.h"
struct SharedMemory
{
std::atomic<bool> a;
@ -41,69 +37,7 @@ struct SharedMemory
}
};
#ifndef __USE_STD_SEMAPHORE__
#ifdef __APPLE__
#include <dispatch/dispatch.h>
class A_Semaphore {
private:
dispatch_semaphore_t native_handle;
public:
A_Semaphore(bool v = false) {
native_handle = dispatch_semaphore_create(v);
}
void acquire() {
// puts("acquire");
dispatch_semaphore_wait(native_handle, DISPATCH_TIME_FOREVER);
}
void release() {
// puts("release");
dispatch_semaphore_signal(native_handle);
}
~A_Semaphore() {
}
};
#else
#include <semaphore.h>
class A_Semaphore {
private:
sem_t native_handle;
public:
A_Semaphore(bool v = false) {
sem_init(&native_handle, v, 1);
}
void acquire() {
sem_wait(&native_handle);
}
void release() {
sem_post(&native_handle);
}
~A_Semaphore() {
sem_destroy(&native_handle);
}
};
#endif
#endif
#endif
#ifdef __USE_STD_SEMAPHORE__
#define __AQUERY_ITC_USE_SEMPH__
#include <semaphore>
class A_Semaphore {
private:
std::binary_semaphore native_handle;
public:
A_Semaphore(bool v = false) {
native_handle = std::binary_semaphore(v);
}
void acquire() {
native_handle.acquire();
}
void release() {
native_handle.release();
}
~A_Semaphore() { }
};
#endif
#endif // _WIN32
#ifdef __AQUERY_ITC_USE_SEMPH__
A_Semaphore prompt{ true }, engine{ false };
@ -225,8 +159,9 @@ inline constexpr static unsigned char monetdbe_type_szs[] = {
1
};
constexpr uint32_t output_buffer_size = 65536;
void print_monetdb_results(Server* srv, const char* sep = " ", const char* end = "\n",
void print_monetdb_results(void* _srv, const char* sep = " ", const char* end = "\n",
uint32_t limit = std::numeric_limits<uint32_t>::max()) {
auto srv = static_cast<Server *>(_srv);
if (!srv->haserror() && srv->cnt && limit) {
char buffer[output_buffer_size];
auto _res = static_cast<monetdbe_result*> (srv->res);
@ -255,7 +190,7 @@ void print_monetdb_results(Server* srv, const char* sep = " ", const char* end =
puts("Error: separator or end string too long");
goto cleanup;
}
if (header_string.size() - l_sep - 1>= 0)
if (header_string.size() >= l_sep + 1)
header_string.resize(header_string.size() - l_sep - 1);
header_string += end + std::string(header_string.size(), '=') + end;
fputs(header_string.c_str(), stdout);

@ -4,21 +4,26 @@
#include <thread>
#include <cstdio>
#include <cstdlib>
using namespace std;
FILE *fp;
long long testing_throughput(uint32_t n_jobs, bool prompt = true){
using namespace std::chrono_literals;
printf("Threadpool througput test with %u jobs. Press any key to start.\n", n_jobs);
auto tp = ThreadPool(thread::hardware_concurrency());
auto tp = ThreadPool(std::thread::hardware_concurrency());
getchar();
auto i = 0u;
fp = fopen("tmp.tmp", "wb");
auto time = chrono::high_resolution_clock::now();
while(i++ < n_jobs) tp.enqueue_task({ [](void* f) {fprintf(fp, "%d ", *(int*)f); free(f); }, new int(i) });
auto time = std::chrono::high_resolution_clock::now();
while(i++ < n_jobs) {
payload_t payload;
payload.f = [](void* f) {fprintf(fp, "%d ", *(int*)f); free(f); return 0; };
payload.args = new int(i);
tp.enqueue_task(payload);
}
puts("done dispatching.");
while (tp.busy()) this_thread::sleep_for(1s);
auto t = (chrono::high_resolution_clock::now() - time).count();
while (tp.busy()) std::this_thread::sleep_for(1s);
auto t = (std::chrono::high_resolution_clock::now() - time).count();
printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", i, t, i/(double)(t));
//this_thread::sleep_for(2s);
fclose(fp);
@ -27,26 +32,31 @@ long long testing_throughput(uint32_t n_jobs, bool prompt = true){
long long testing_transaction(uint32_t n_burst, uint32_t n_batch,
uint32_t base_time, uint32_t var_time, bool prompt = true, FILE* _fp = stdout){
using namespace std::chrono_literals;
printf("Threadpool transaction test: burst: %u, batch: %u, time: [%u, %u].\n"
, n_burst, n_batch, base_time, var_time + base_time);
if (prompt) {
puts("Press any key to start.");
getchar();
}
auto tp = ThreadPool(thread::hardware_concurrency());
auto tp = ThreadPool(std::thread::hardware_concurrency());
fp = _fp;
auto i = 0u, j = 0u;
auto time = chrono::high_resolution_clock::now();
auto time = std::chrono::high_resolution_clock::now();
while(j++ < n_batch){
i = 0u;
while(i++ < n_burst)
tp.enqueue_task({ [](void* f) { fprintf(fp, "%d ", *(int*)f); free(f); }, new int(j) });
while(i++ < n_burst) {
payload_t payload;
payload.f = [](void* f) { fprintf(fp, "%d ", *(int*)f); free(f); return 0; };
payload.args = new int(j);
tp.enqueue_task(payload);
}
fflush(stdout);
this_thread::sleep_for(chrono::microseconds(rand()%var_time + base_time));
std::this_thread::sleep_for(std::chrono::microseconds(rand()%var_time + base_time));
}
puts("done dispatching.");
while (tp.busy()) this_thread::sleep_for(1s);
auto t = (chrono::high_resolution_clock::now() - time).count();
while (tp.busy()) std::this_thread::sleep_for(1s);
auto t = (std::chrono::high_resolution_clock::now() - time).count();
printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", j*i, t, j*i/(double)(t));
return t;
@ -58,17 +68,17 @@ long long testing_destruction(bool prompt = true){
puts("Press any key to start.");
getchar();
}
auto time = chrono::high_resolution_clock::now();
auto time = std::chrono::high_resolution_clock::now();
for(int i = 0; i < 8; ++i)
testing_transaction(0xfff, 0xff, 400, 100, false, fp);
for(int i = 0; i < 64; ++i)
testing_transaction(0xff, 0xf, 60, 20, false, fp);
for(int i = 0; i < 1024; ++i) {
auto tp = new ThreadPool(256);
auto tp = new ThreadPool(255);
delete tp;
}
return 0;
auto t = (chrono::high_resolution_clock::now() - time).count();
auto t = (std::chrono::high_resolution_clock::now() - time).count();
fclose(fp);
return t;
}

@ -1,4 +1,5 @@
#include "threading.h"
#include "libaquery.h"
#include <thread>
#include <atomic>
#include <mutex>
@ -105,7 +106,8 @@ ThreadPool::ThreadPool(uint32_t n_threads)
current_payload = new payload_t[n_threads];
for (uint32_t i = 0; i < n_threads; ++i){
atomic_init(tf + i, static_cast<unsigned char>(0b10));
// atomic_init(tf + i, static_cast<unsigned char>(0b10));
tf[i] = static_cast<unsigned char>(0b10);
th[i] = thread(&ThreadPool::daemon_proc, this, i);
}
@ -152,21 +154,50 @@ bool ThreadPool::busy(){
return true;
}
Trigger::Trigger(ThreadPool* tp){
IntervalBasedTriggerHost::IntervalBasedTriggerHost(ThreadPool* tp){
this->tp = tp;
this->triggers = new vector_type<IntervalBasedTrigger>;
adding_trigger = new mutex();
this->now = std::chrono::high_resolution_clock::now().time_since_epoch().count();
}
void IntervalBasedTrigger::timer::reset(){
time_remaining = interval;
void IntervalBasedTriggerHost::add_trigger(StoredProcedure *p, uint32_t interval) {
auto tr = IntervalBasedTrigger{.interval = interval, .time_remaining = 0, .sp = p};
auto vt_triggers = static_cast<vector_type<IntervalBasedTrigger> *>(this->triggers);
adding_trigger->lock();
vt_triggers->emplace_back(tr);
adding_trigger->unlock();
}
bool IntervalBasedTrigger::timer::tick(uint32_t t){
if (time_remaining > t) {
time_remaining -= t;
return false;
void IntervalBasedTriggerHost::tick() {
const auto current_time = std::chrono::high_resolution_clock::now().time_since_epoch().count();
const auto delta_t = static_cast<uint32_t>((current_time - now) / 1000000); // miliseconds precision
now = current_time;
auto vt_triggers = static_cast<vector_type<IntervalBasedTrigger> *>(this->triggers);
adding_trigger->lock();
for(auto& t : *vt_triggers) {
if(t.tick(delta_t)) {
payload_t payload;
payload.f = execTriggerPayload;
payload.args = static_cast<void*>(new StoredProcedurePayload {t.sp, cxt});
tp->enqueue_task(payload);
}
else{
time_remaining = interval - t%interval;
return true;
}
adding_trigger->unlock();
}
void IntervalBasedTrigger::reset() {
time_remaining = interval;
}
bool IntervalBasedTrigger::tick(uint32_t delta_t) {
bool ret = false;
if (time_remaining <= delta_t)
ret = true;
if (auto curr_dt = delta_t % interval; time_remaining <= curr_dt)
time_remaining = interval + time_remaining - curr_dt;
else
time_remaining = time_remaining - curr_dt;
return ret;
}

@ -19,7 +19,7 @@ struct payload_t{
class ThreadPool{
public:
ThreadPool(uint32_t n_threads = 0);
explicit ThreadPool(uint32_t n_threads = 0);
void enqueue_task(const payload_t& payload);
bool busy();
virtual ~ThreadPool();
@ -39,29 +39,44 @@ private:
};
class Trigger{
private:
void* triggers; //min-heap by t-rem
virtual void tick() = 0;
#include <thread>
#include <mutex>
class A_Semphore;
class TriggerHost {
protected:
void* triggers;
std::thread* handle;
ThreadPool *tp;
Context* cxt;
std::mutex* adding_trigger;
virtual void tick() = 0;
public:
Trigger(ThreadPool* tp);
TriggerHost() = default;
virtual ~TriggerHost() = default;
};
class IntervalBasedTrigger : public Trigger{
public:
struct timer{
struct StoredProcedure;
struct IntervalBasedTrigger {
uint32_t interval; // in milliseconds
uint32_t time_remaining;
StoredProcedure* sp;
void reset();
bool tick(uint32_t t);
};
void add_trigger();
class IntervalBasedTriggerHost : public TriggerHost {
public:
explicit IntervalBasedTriggerHost(ThreadPool *tp);
void add_trigger(StoredProcedure* stored_procedure, uint32_t interval);
private:
unsigned long long now;
void tick() override;
};
class CallbackBasedTrigger : public Trigger{
class CallbackBasedTriggerHost : public TriggerHost {
public:
void add_trigger();
private:

@ -16,18 +16,6 @@ struct SharedMemory
void FreeMemoryMap();
};
#ifndef __USE_STD_SEMAPHORE__
class A_Semaphore {
private:
void* native_handle;
public:
A_Semaphore(bool);
void acquire();
void release();
~A_Semaphore();
};
#endif
#endif // WIN32
#endif // WINHELPER

Loading…
Cancel
Save