diff --git a/.gitignore b/.gitignore index 739eae5..72774f3 100644 --- a/.gitignore +++ b/.gitignore @@ -59,6 +59,10 @@ data/benchmark !nyctx100.csv !network.csv !test_complex.csv +data/electricity* +data/covtype* +data/phishing* +data/power* *.out *.asm !mmw.so diff --git a/Makefile b/Makefile index 4240bf6..98987fa 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ OS_SUPPORT = MonetDB_LIB = MonetDB_INC = Defines = +CC = $(CXX) -xc CXXFLAGS = --std=c++2a ifeq ($(AQ_DEBUG), 1) OPTFLAGS = -g3 #-fsanitize=address @@ -17,7 +18,7 @@ COMPILER = $(strip $(_COMPILER)) LIBTOOL = ar rcs USELIB_FLAG = -Wl,--whole-archive,libaquery.a -Wl,-no-whole-archive LIBAQ_SRC = server/monetdb_conn.cpp server/libaquery.cpp -LIBAQ_OBJ = monetdb_conn.o libaquery.o +LIBAQ_OBJ = monetdb_conn.o libaquery.o monetdb_ext.o SEMANTIC_INTERPOSITION = -fno-semantic-interposition RANLIB = ranlib _LINKER_BINARY = $(shell `$(CXX) -print-prog-name=ld` -v 2>&1 | grep -q LLVM && echo lld || echo ld) @@ -43,7 +44,7 @@ else LIBTOOL = gcc-ar rcs endif endif -OPTFLAGS += $(SEMANTIC_INTERPOSITION) +LINKFLAGS += $(SEMANTIC_INTERPOSITION) ifeq ($(PCH), 1) PCHFLAGS = -include server/pch.hpp @@ -82,7 +83,7 @@ else MonetDB_INC += $(AQ_MONETDB_INC) MonetDB_INC += -I/usr/local/include/monetdb -I/usr/include/monetdb endif - MonetDB_LIB += -lmonetdbe + MonetDB_LIB += -lmonetdbe -lmonetdbsql -lbat endif ifeq ($(THREADING),1) @@ -128,6 +129,7 @@ pch: $(CXX) -x c++-header server/pch.hpp $(FPIC) $(CXXFLAGS) libaquery: $(CXX) -c $(FPIC) $(PCHFLAGS) $(LIBAQ_SRC) $(OS_SUPPORT) $(CXXFLAGS) &&\ + $(CC) -c server/monetdb_ext.c $(OPTFLAGS) $(MonetDB_INC) &&\ $(LIBTOOL) libaquery.a $(LIBAQ_OBJ) &&\ $(RANLIB) libaquery.a diff --git a/aquery_config.py b/aquery_config.py index df2511a..41f69c0 100644 --- a/aquery_config.py +++ b/aquery_config.py @@ -2,7 +2,7 @@ ## GLOBAL CONFIGURATION FLAGS -version_string = '0.6.0a' +version_string = '0.7.0a' add_path_to_ldpath = True rebuild_backend = False run_backend = True diff --git a/aquery_parser/__init__.py b/aquery_parser/__init__.py index ea4401c..61fd967 100644 --- a/aquery_parser/__init__.py +++ b/aquery_parser/__init__.py @@ -5,20 +5,18 @@ # You can obtain one at http://mozilla.org/MPL/2.0/. # # Contact: Kyle Lahnakoski (kyle@lahnakoski.com) -# +# Bill Sun 2022 - 2023 from __future__ import absolute_import, division, unicode_literals import json from threading import Lock -from aquery_parser.sql_parser import scrub +from aquery_parser.parser import scrub from aquery_parser.utils import ansi_string, simple_op, normal_op - +import aquery_parser.parser parse_locker = Lock() # ENSURE ONLY ONE PARSING AT A TIME common_parser = None -mysql_parser = None -sqlserver_parser = None SQL_NULL = {"null": {}} @@ -33,44 +31,10 @@ def parse(sql, null=SQL_NULL, calls=simple_op): with parse_locker: if not common_parser: - common_parser = sql_parser.common_parser() + common_parser = aquery_parser.parser.common_parser() result = _parse(common_parser, sql, null, calls) return result - -def parse_mysql(sql, null=SQL_NULL, calls=simple_op): - """ - PARSE MySQL ASSUME DOUBLE QUOTED STRINGS ARE LITERALS - :param sql: String of SQL - :param null: What value to use as NULL (default is the null function `{"null":{}}`) - :return: parse tree - """ - global mysql_parser - - with parse_locker: - if not mysql_parser: - mysql_parser = sql_parser.mysql_parser() - return _parse(mysql_parser, sql, null, calls) - - -def parse_sqlserver(sql, null=SQL_NULL, calls=simple_op): - """ - PARSE MySQL ASSUME DOUBLE QUOTED STRINGS ARE LITERALS - :param sql: String of SQL - :param null: What value to use as NULL (default is the null function `{"null":{}}`) - :return: parse tree - """ - global sqlserver_parser - - with parse_locker: - if not sqlserver_parser: - sqlserver_parser = sql_parser.sqlserver_parser() - return _parse(sqlserver_parser, sql, null, calls) - - -parse_bigquery = parse_mysql - - def _parse(parser, sql, null, calls): utils.null_locations = [] utils.scrub_op = calls @@ -85,4 +49,4 @@ def _parse(parser, sql, null, calls): _ = json.dumps -__all__ = ["parse", "format", "parse_mysql", "parse_bigquery", "normal_op", "simple_op"] +__all__ = ["parse", "format", "normal_op", "simple_op"] diff --git a/aquery_parser/keywords.py b/aquery_parser/keywords.py index b9da28a..19748c7 100644 --- a/aquery_parser/keywords.py +++ b/aquery_parser/keywords.py @@ -5,7 +5,7 @@ # You can obtain one at http://mozilla.org/MPL/2.0/. # # Contact: Kyle Lahnakoski (kyle@lahnakoski.com) -# +# Bill Sun 2022 - 2023 # SQL CONSTANTS from mo_parsing import * diff --git a/aquery_parser/sql_parser.py b/aquery_parser/parser.py similarity index 92% rename from aquery_parser/sql_parser.py rename to aquery_parser/parser.py index 9237470..819a18a 100644 --- a/aquery_parser/sql_parser.py +++ b/aquery_parser/parser.py @@ -5,7 +5,7 @@ # You can obtain one at http://mozilla.org/MPL/2.0/. # # Contact: Kyle Lahnakoski (kyle@lahnakoski.com) -# +# Bill Sun 2022 - 2023 from sre_parse import WHITESPACE @@ -28,37 +28,12 @@ simple_ident = Regex(simple_ident.__regex__()[1]) def common_parser(): combined_ident = Combine(delimited_list( - ansi_ident | mysql_backtick_ident | simple_ident, separator=".", combine=True, - )).set_parser_name("identifier") - - return parser(ansi_string | mysql_doublequote_string, combined_ident) - - -def mysql_parser(): - mysql_string = ansi_string | mysql_doublequote_string - mysql_ident = Combine(delimited_list( - mysql_backtick_ident | sqlserver_ident | simple_ident, - separator=".", - combine=True, - )).set_parser_name("mysql identifier") - - return parser(mysql_string, mysql_ident) - - -def sqlserver_parser(): - combined_ident = Combine(delimited_list( - ansi_ident - | mysql_backtick_ident - | sqlserver_ident - | Word(FIRST_IDENT_CHAR, IDENT_CHAR), - separator=".", - combine=True, + ansi_ident | aquery_backtick_ident | simple_ident, separator=".", combine=True, )).set_parser_name("identifier") - return parser(ansi_string, combined_ident, sqlserver=True) + return parser(ansi_string | aquery_doublequote_string, combined_ident) - -def parser(literal_string, ident, sqlserver=False): +def parser(literal_string, ident): with Whitespace() as engine: engine.add_ignore(Literal("--") + restOfLine) engine.add_ignore(Literal("#") + restOfLine) @@ -184,12 +159,10 @@ def parser(literal_string, ident, sqlserver=False): ) ) - if not sqlserver: - # SQL SERVER DOES NOT SUPPORT [] FOR ARRAY CONSTRUCTION (USED FOR IDENTIFIERS) - create_array = ( - Literal("[") + delimited_list(Group(expr))("args") + Literal("]") - | create_array - ) + create_array = ( + Literal("[") + delimited_list(Group(expr))("args") + Literal("]") + | create_array + ) create_array = create_array / to_array diff --git a/aquery_parser/types.py b/aquery_parser/types.py index acdd428..30527f7 100644 --- a/aquery_parser/types.py +++ b/aquery_parser/types.py @@ -5,7 +5,7 @@ # You can obtain one at http://mozilla.org/MPL/2.0/. # # Contact: Kyle Lahnakoski (kyle@lahnakoski.com) -# +# Bill Sun 2022 - 2023 # KNOWN TYPES diff --git a/aquery_parser/utils.py b/aquery_parser/utils.py index 6aeaec5..840671e 100644 --- a/aquery_parser/utils.py +++ b/aquery_parser/utils.py @@ -5,7 +5,7 @@ # You can obtain one at http://mozilla.org/MPL/2.0/. # # Contact: Kyle Lahnakoski (kyle@lahnakoski.com) -# +# Bill Sun 2022 - 2023 import ast @@ -610,9 +610,8 @@ hex_num = ( # STRINGS ansi_string = Regex(r"\'(\'\'|[^'])*\'") / to_string -mysql_doublequote_string = Regex(r'\"(\"\"|[^"])*\"') / to_string +aquery_doublequote_string = Regex(r'\"(\"\"|[^"])*\"') / to_string # BASIC IDENTIFIERS ansi_ident = Regex(r'\"(\"\"|[^"])*\"') / unquote -mysql_backtick_ident = Regex(r"\`(\`\`|[^`])*\`") / unquote -sqlserver_ident = Regex(r"\[(\]\]|[^\]])*\]") / unquote +aquery_backtick_ident = Regex(r"\`(\`\`|[^`])*\`") / unquote diff --git a/maketest.sh b/maketest.sh new file mode 100644 index 0000000..33c208b --- /dev/null +++ b/maketest.sh @@ -0,0 +1,2 @@ +make snippet_uselib +cp ./dll.so procedures/q70.so diff --git a/proctool.py b/proctool.py index 1ff726c..c92723e 100644 --- a/proctool.py +++ b/proctool.py @@ -5,14 +5,14 @@ from typing import List name : str = input('Filename (in path ./procedures/.aqp):') def write(): - s : str = input() + s : str = input('Enter queries: empty line to stop. \n') qs : List[str] = [] while(len(s) and not s.startswith('S')): qs.append(s) s = input() - ms : int = int(input()) + ms : int = int(input('number of modules:')) with open(f'./procedures/{name}.aqp', 'wb') as fp: fp.write(struct.pack("I", len(qs) + (ms > 0))) @@ -27,21 +27,29 @@ def write(): fp.write(b'\x00') -def read(): +def read(cmd : str): + rc = len(cmd) > 1 and cmd[1] == 'c' + clip = '' with open(f'./procedures/{name}.aqp', 'rb') as fp: nq = struct.unpack("I", fp.read(4))[0] ms = struct.unpack("I", fp.read(4))[0] qs = fp.read().split(b'\x00') print(f'Procedure {name}, {nq} queries, {ms} modules:') for q in qs: - print(' ' + q.decode('utf-8')) - - + q = q.decode('utf-8').strip() + if q: + q = f'"{q}",' if rc else f'\t{q}' + print(q) + clip += q + '\n' + if rc and not input('copy to clipboard?').lower().startswith('n'): + import pyperclip + pyperclip.copy(clip) + if __name__ == '__main__': while True: - cmd = input("r for read, w for write: ") + cmd = input("r for read, rc to read c_str, w for write: ") if cmd.lower().startswith('r'): - read() + read(cmd.lower()) break elif cmd.lower().startswith('w'): write() diff --git a/rfdata_preproc.py b/rfdata_preproc.py new file mode 100644 index 0000000..fdd8cb0 --- /dev/null +++ b/rfdata_preproc.py @@ -0,0 +1,43 @@ +import os + +sep = os.sep + +# toggles +dataset = 'power' # [covtype, electricity, mixed, phishing, power] +use_threadpool = False # True + +# environments +input_prefix = f'data{sep}{dataset}_orig' +output_prefix = f'data{sep}{dataset}' +sep_field = ',' +sep_subfield = ';' + +lst_files = os.listdir(input_prefix) +lst_files.sort() + +try: + os.mkdir(output_prefix) +except FileExistsError: + pass + +def process(f : str): + filename = input_prefix + sep + f + ofilename = output_prefix + sep + f[:-3] + 'csv' + with open(filename, 'rb') as ifile: + icontents = ifile.read().decode('utf-8') + with open(ofilename, 'wb') as ofile: + for l in icontents.splitlines(): + fields = l.strip().split(' ') + subfields = fields[:-1] + ol = ( # fields[0] + sep_field + + sep_subfield.join(subfields) + + sep_field + fields[-1] + '\n') + ofile.write(ol.encode('utf-8')) + +if not use_threadpool: + for f in lst_files: + process(f) +elif __name__ == '__main__': + from multiprocessing import Pool + with Pool(8) as tp: + tp.map(process, lst_files) diff --git a/sdk/DecisionTree.h b/sdk/DecisionTree.h index 44ee6c5..2f48558 100644 --- a/sdk/DecisionTree.h +++ b/sdk/DecisionTree.h @@ -9,49 +9,52 @@ struct DR; struct DT; -//enum Evaluation {gini, entropy, logLoss}; - -class DecisionTree{ +class DecisionTree +{ public: + DT *DTree = nullptr; + double minIG; + long maxHeight; + long feature; + long maxFeature; + bool isRF; + long classes; + int *Sparse; + double forgetRate; + double increaseRate; + double initialIR; + Evaluation evalue; + long Rebuild; + long roundNo; + long called; + long retain; + long lastT; + long lastAll; -DT* DTree = nullptr; -int maxHeight; -long feature; -long maxFeature; -long seed; -long classes; -int* Sparse; -double forgetRate; -Evaluation evalue; -long Rebuild; -long roundNo; -long called; -long retain; - -DecisionTree(int hight, long f, int* sparse, double forget, long maxFeature, long noClasses, Evaluation e, long r, long rb); + DecisionTree(long f, int *sparse, double forget, long maxFeature, long noClasses, Evaluation e); -void Stablelize(); + void Stablelize(); -void Free(); + void Free(); -minEval findMinGiniDense(double** data, long* result, long* totalT, long size, long col); + minEval findMinGiniDense(double **data, long *result, long *totalT, long size, long col); -minEval findMinGiniSparse(double** data, long* result, long* totalT, long size, long col, DT* current); + minEval findMinGiniSparse(double **data, long *result, long *totalT, long size, long col, DT *current); -minEval incrementalMinGiniDense(double** data, long* result, long size, long col, long*** count, double** record, long* max, long newCount, long forgetSize, bool isRoot); + minEval incrementalMinGiniDense(double **data, long *result, long size, long col, long ***count, double **record, long *max, long newCount, long forgetSize, double **forgottenData, long *forgottenClass); -minEval incrementalMinGiniSparse(double** dataNew, long* resultNew, long sizeNew, long sizeOld, DT* current, long col, long forgetSize, bool isRoot); + minEval incrementalMinGiniSparse(double **dataNew, long *resultNew, long sizeNew, long sizeOld, DT *current, long col, long forgetSize, double **forgottenData, long *forgottenClass); -long* fitThenPredict(double** trainData, long* trainResult, long trainSize, double** testData, long testSize); + long *fitThenPredict(double **trainData, long *trainResult, long trainSize, double **testData, long testSize); -void fit(double** data, long* result, long size); + void fit(double **data, long *result, long size); -void Update(double** data, long* result, long size, DT* current); + void Update(double **data, long *result, long size, DT *current); -void IncrementalUpdate(double** data, long* result, long size, DT* current); + void IncrementalUpdate(double **data, long *result, long size, DT *current); -long Test(double* data, DT* root); + long Test(double *data, DT *root); -void print(DT* root); + void print(DT *root); }; #endif diff --git a/sdk/Evaluation.cpp b/sdk/Evaluation.cpp index 8e347a7..cf6f5d8 100644 --- a/sdk/Evaluation.cpp +++ b/sdk/Evaluation.cpp @@ -26,7 +26,7 @@ minEval giniSparse(double** data, long* result, long* d, long size, long col, lo double gini1, gini2; double c; long l, r; - for(i=0; ientropy1){ @@ -140,8 +140,8 @@ minEval entropySparseIncremental(long sizeTotal, long classes, double* newSorted for(j=0;je1){ @@ -159,9 +159,9 @@ minEval giniDense(long max, long size, long classes, long** rem, long* d, double double gini1, gini2; long *t, *t2, *r, *r2, i, j; for(i=0;i0){ - t2 = rem[d[i-1]]; + t2 = rem[i-1]; for(j=0;j<=classes;j++){ t[j]+=t2[j]; } @@ -179,7 +179,7 @@ minEval giniDense(long max, long size, long classes, long** rem, long* d, double gini1 = (gini1*t[classes])/size + (gini2*(size-t[classes]))/size; if(gini10){ - t2 = rem[d[i-1]]; + t2 = rem[i-1]; for(j=0;j<=classes;j++){ t[j]+=t2[j]; } @@ -207,71 +207,15 @@ minEval entropyDense(long max, long size, long classes, long** rem, long* d, dou long l, r; l = t[j]; r = totalT[j]-l; - entropy1 -= ((double)l/t[classes])*log((double)l/t[classes]); - entropy2 -= ((double)r/(size-t[classes]))*log((double)r/(size-t[classes])); + if(l!=0)entropy1 -= ((double)l/t[classes])*log((double)l/t[classes]); + if(r!=0)entropy2 -= ((double)r/(size-t[classes]))*log((double)r/(size-t[classes])); } entropy1 = entropy1*t[classes]/size + entropy2*(size-t[classes])/size; if(entropy1 #include #include +#include +#include +#include +#include +long poisson(int Lambda) +{ + int k = 0; + long double p = 1.0; + long double l = exp(-Lambda); + srand((long)clock()); + + while(p>=l) + { + double u = (double)(rand()%10000)/10000; + p *= u; + k++; + } + if (k>11)k=11; + return k-1; +} struct DT{ int height; long* featureId; @@ -30,62 +50,126 @@ struct DT{ long size = 0;// Size of the dataset }; -RandomForest::RandomForest(long mTree, long actTree, long rTime, int h, long feature, int* s, double forg, long maxF, long noC, Evaluation eval, long r, long rb){ +RandomForest::RandomForest(long mTree, long feature, int* s, double forg, long noC, Evaluation eval, bool b, double t){ srand((long)clock()); - Rebuild = rb; - if(actTree<1)actTree=1; - noTree = actTree; - activeTree = actTree; - treePointer = 0; - if(mTreef)minF=f; DTrees = (DecisionTree**)malloc(mTree*sizeof(DecisionTree*)); - for(i=0; iisRF=true; } } void RandomForest::fit(double** data, long* result, long size){ - if(timer==rotateTime and maxTree!=activeTree){ - Rotate(); - timer=0; - } - long i, j, k; + long i, j, k, l; double** newData; long* newResult; - for(i=0; i=0){ + double lastSm = (double)lastT/lastAll; + double localSm = (double)localT/localAll; + double lastSd = sqrt(pow((1.0-lastSm),2)*lastT+pow(lastSm,2)*(lastAll-lastT)/(lastAll-1)); + double localSd = sqrt(pow((1.0-localSm),2)*localT+pow(localSm,2)*(localAll-localT)/(localAll-1)); + double v = lastAll+localAll-2; + double sp = sqrt(((lastAll-1) * lastSd * lastSd + (localAll-1) * localSd * localSd) / v); + double q; + double t = lastSm-localSm; + if(sp==0){q = 1;} + else{ + t = t/(sp*sqrt(1.0/lastAll+1.0/localAll)); + boost::math::students_t dist(v); + double c = cdf(dist, t); + q = cdf(complement(dist, fabs(t))); + } + if(q<=tThresh){ + lastT += localT; + lastAll += localAll; + }else if(t<0){ + lastT = localT; + lastAll = localAll; + }else{ + double newAcc = (double)localT/localAll; + double lastAcc= (double)lastT/lastAll; + stale = floor((newAcc-lastAcc)/(lastAcc)*maxTree); + lastT = localT; + lastAll = localAll; } - newResult[j] = result[j]; + }else{ + lastT = localT; + lastAll = localAll; } - DTrees[(i+treePointer)%maxTree]->fit(newData, newResult, size); } - timer++; + Rotate(stale); + for(i=0; ifit(newData, newResult, size*times); + } + /*for(i=0; iretain = 10*size; + //if(backupTrees[i]==nullptr) continue; + long times; + //times = poisson(posMean); + //if(times==0)continue; + times=1; + newData = (double**)malloc(sizeof(double*)*size*times); + newResult = (long*)malloc(sizeof(long)*size*times); + long c = 0; + for(j = 0; jfit(newData, newResult, size*times); + }*/ } long* RandomForest::fitThenPredict(double** trainData, long* trainResult, long trainSize, double** testData, long testSize){ @@ -97,36 +181,80 @@ long* RandomForest::fitThenPredict(double** trainData, long* trainResult, long t return testResult; } -void RandomForest::Rotate(){ - if(noTree==maxTree){ - DTrees[(treePointer+activeTree)%maxTree]->Free(); - delete DTrees[(treePointer+activeTree)%maxTree]; - }else{ - noTree++; +void RandomForest::Rotate(long stale){ + long i, j, k; + long minIndex = -1; + if(stale>=0)return; + else{ + stale = std::min(stale, maxTree); + stale*=-1; + while(stale>0){ + long currentMin = 2147483647; + for(i = 0; iDTree->size; + newData = (double**)malloc(sizeof(double*)*size); + newResult = (long*)malloc(sizeof(long)*size); + for(j = 0; jDTree->dataRecord[j][k]; + } + newData[j][f] = 0; + newResult[j] = DTrees[minIndex]->DTree->resultRecord[j]; + } + DTrees[minIndex]->Stablelize(); + DTrees[minIndex]->Free(); + delete DTrees[minIndex]; + DTrees[minIndex] = new DecisionTree(f, sparse, forget, minF+rand()%(f+1-minF), noClasses, e); + DTrees[minIndex]->isRF=true; + DTrees[minIndex]->fit(newData, newResult, size); + for(j=0; jTest(newData[j], DTrees[minIndex]->DTree)==newResult[j])lastT2++; + } + DTrees[minIndex]->lastAll=size; + DTrees[minIndex]->lastT=lastT2; + } } - DTrees[(treePointer+activeTree)%maxTree] = new DecisionTree(height, f, sparse, forget, maxFeature, noClasses, e, retain, Rebuild); - long size = DTrees[(treePointer+activeTree-1)%maxTree]->DTree->size; - double** newData = new double*[size]; - long* newResult = new long[size]; - for(long j = 0; jDTree->dataRecord[j][k]; - } - newResult[j] = DTrees[(treePointer+activeTree-1)%maxTree]->DTree->resultRecord[j]; - } - - DTrees[(treePointer+activeTree)%maxTree]->fit(newData, newResult, size); - DTrees[treePointer]->Stablelize(); - if(++treePointer==maxTree)treePointer=0; } +long RandomForest::Test(double* data, long result){ + long i; + long predict[noClasses]; + for(i=0; iTest(data, DTrees[i]->DTree); + predict[tmp]++; + if(tmp==result)allT[i]++; + } + + long ret = 0; + for(i=1; ipredict[ret])ret = i; + } + + return ret; +} + long RandomForest::Test(double* data){ long i; long predict[noClasses]; for(i=0; iTest(data, DTrees[i]->DTree)]++; } diff --git a/sdk/RF.h b/sdk/RF.h index d0eee67..5643d44 100644 --- a/sdk/RF.h +++ b/sdk/RF.h @@ -14,33 +14,34 @@ struct DT; class RandomForest{ public: -long noTree; long maxTree; long activeTree; -long treePointer; -long rotateTime; -long timer; -long retain; -DecisionTree** DTrees = nullptr; +long* allT; +double tThresh; +DecisionTree** DTrees; +DecisionTree** backupTrees; long height; -long Rebuild; +bool bagging; long f; int* sparse; double forget; -long maxFeature; long noClasses; Evaluation e; +long lastT; +long lastAll; +int minF; - -RandomForest(long maxTree, long activeTree, long rotateTime, int height, long f, int* sparse, double forget, long maxFeature=0, long noClasses=2, Evaluation e=Evaluation::gini, long r=-1, long rb=2147483647); +RandomForest(long maxTree, long f, int* sparse, double forget, long noClasses=2, Evaluation e=Evaluation::entropy, bool b=false, double tThresh=0.05); void fit(double** data, long* result, long size); long* fitThenPredict(double** trainData, long* trainResult, long trainSize, double** testData, long testSize); -void Rotate(); +void Rotate(long stale); long Test(double* data); + +long Test(double* data, long result); }; #endif diff --git a/sdk/incrementalDecisionTree.cpp b/sdk/incrementalDecisionTree.cpp index 29431cb..d21d408 100644 --- a/sdk/incrementalDecisionTree.cpp +++ b/sdk/incrementalDecisionTree.cpp @@ -11,6 +11,7 @@ std::random_device rd; std::mt19937 g(rd()); + struct minEval{ double value; int* values; @@ -23,17 +24,18 @@ struct minEval{ }; struct DT{ - int height; + long height; long* featureId; DT* left = nullptr; DT* right = nullptr; + bool created; // split info bool terminate; double dpoint; long feature; long result; - + // Sparse data record double** sortedData; // for each feature, sorted data long** sortedResult; @@ -47,19 +49,22 @@ struct DT{ double** dataRecord = nullptr;// Record the data long* resultRecord = nullptr;// Record the result long size = 0;// Size of the dataset + }; long seed = (long)clock(); long* Rands(long feature, long maxFeature){ - //srand(seed++); + srand(seed); long i; long* ret = (long*) malloc(feature*sizeof(long)); for(i =0; icreated = true; long i; t->count = (long***)malloc(f*sizeof(long**)); for(i=0; icount[i]=nullptr; @@ -77,8 +81,6 @@ void createTree(DT* t, long currentHeight, long height, long f, long maxF, long for(i=0; irecord[i]=nullptr; t->max = (long*)malloc(f*sizeof(long)); t->max[0] = -1; - t->featureId = Rands(f, maxF); - //t->T = (long*)malloc(classes*sizeof(long)); t->sortedData = (double**) malloc(f*sizeof(double*)); for(i=0; isortedData[i]=nullptr; t->sortedResult = (long**) malloc(f*sizeof(long*)); @@ -88,20 +90,18 @@ void createTree(DT* t, long currentHeight, long height, long f, long maxF, long t->height = currentHeight; t->feature = -1; t->size = 0; - if(currentHeight>height){ - t->right = nullptr; - t->left = nullptr; - return; - } t->left = (DT*)malloc(sizeof(DT)); t->right = (DT*)malloc(sizeof(DT)); - createTree(t->left, currentHeight+1, height, f, maxF, classes); - createTree(t->right, currentHeight+1, height, f, maxF, classes); + t->left->created = false; + t->right->created = false; + t->left->height = currentHeight+1; + t->right->height = currentHeight+1; } void stableTree(DT* t, long f){ long i, j; + if(not t->created)return; for(i=0; icount[i]==nullptr)continue; for(j=0; jmax[i]; j++){ @@ -116,7 +116,6 @@ void stableTree(DT* t, long f){ } free(t->record); free(t->max); - free(t->featureId); for(i=0; isortedData[i]==nullptr)continue; free(t->sortedData[i]); @@ -126,25 +125,28 @@ void stableTree(DT* t, long f){ if(t->sortedResult[i]==nullptr)continue; free(t->sortedResult[i]); } - free(t->sortedResult); free(t->dataRecord); free(t->resultRecord); + free(t->sortedResult); if(t->right!=nullptr)stableTree(t->right, f); if(t->left!=nullptr)stableTree(t->left, f); } void freeTree(DT* t){ - if(t->left != nullptr)freeTree(t->left); - if(t->right != nullptr)freeTree(t->right); + if(t->created){ + freeTree(t->left); + freeTree(t->right); + } free(t); } -DecisionTree::DecisionTree(int height, long f, int* sparse, double forget=0.1, long maxF=0, long noClasses=2, Evaluation e=Evaluation::gini, long r=-1, long rb=1){ +DecisionTree::DecisionTree(long f, int* sparse, double rate, long maxF, long noClasses, Evaluation e){ evalue = e; - called = 0; long i; // Max tree height - maxHeight = height; + initialIR = rate; + increaseRate = rate; + isRF = false; // Number of features feature = f; // If each feature is sparse or dense, 0 for dense, 1 for sparse, >2 for number of category @@ -157,40 +159,69 @@ DecisionTree::DecisionTree(int height, long f, int* sparse, double forget=0.1, l DTree->feature = -1; // The number of feature that is considered in each node if(maxF>=f){ - maxFeature = f; + maxF = f; }else if(maxF<=0){ - maxFeature = (long)round(sqrt(f)); - }else{ - maxFeature = maxF; - } - forgetRate = std::min(1.0, forget); - retain = r; - createTree(DTree, 0, maxHeight, f, maxFeature, noClasses); - // Randomly generate the features - //DTree->featureId = Rands(); - //DTree->sorted = (long**) malloc(f*sizeof(long*)); + maxF = (long)round(sqrt(f)); + } + maxFeature = maxF; + forgetRate = -10.0; + retain = 0; + DTree->featureId = Rands(f, maxF); + DTree->terminate = true; + DTree->result = 0; + DTree->size = 0; + createNode(DTree, 0, f, noClasses); // Number of classes of this dataset - Rebuild = rb; - roundNo = 0; + Rebuild = 2147483647; + roundNo = 64; classes = std::max(noClasses, (long)2); - //DTree->T = (long*) malloc(noClasses*sizeof(long)); - /*for(long i = 0; iT[i]=0; - }*/ + // last Acc + lastAll = classes; + lastT = 1; } void DecisionTree::Stablelize(){ free(Sparse); - stableTree(DTree, feature); + long i, j; + DT* t = DTree; + long f = feature; + for(i=0; icount[i]==nullptr)continue; + for(j=0; jmax[i]; j++){ + free(t->count[i][j]); + } + free(t->count[i]); + } + free(t->count); + for(i=0; irecord[i]==nullptr)continue; + free(t->record[i]); + } + free(t->record); + free(t->max); + free(t->featureId); + for(i=0; isortedData[i]==nullptr)continue; + free(t->sortedData[i]); + } + free(t->sortedData); + for(i=0; isortedResult[i]==nullptr)continue; + free(t->sortedResult[i]); + } + free(t->sortedResult); + if(DTree->right!=nullptr)stableTree(t->right, feature); + if(DTree->left!=nullptr)stableTree(t->left, feature); } void DecisionTree::Free(){ + free(DTree->dataRecord); + free(DTree->resultRecord); freeTree(DTree); } -minEval DecisionTree::incrementalMinGiniSparse(double** dataTotal, long* resultTotal, long sizeTotal, long sizeNew, DT* current, long col, long forgetSize, bool isRoot){ +minEval DecisionTree::incrementalMinGiniSparse(double** dataTotal, long* resultTotal, long sizeTotal, long sizeNew, DT* current, long col, long forgetSize, double** forgottenData, long* forgottenClass){ long i, j; - if(isRoot){sizeNew=sizeTotal-forgetSize;} long newD[sizeNew]; for(i=0; isortedData[col]; long* oldResult = current->sortedResult[col]; + long tmp2 = forgetSize; + long* allForget = (long*)malloc(sizeof(long)*classes); + for(i=0; isortedData[col] = newSortedData; current->sortedResult[col] = newSortedResult; free(oldData); free(oldResult); - minEval ret; if(evalue == Evaluation::gini){ ret = giniSparseIncremental(sizeTotal, classes, newSortedData, newSortedResult, T); @@ -240,28 +295,43 @@ minEval DecisionTree::incrementalMinGiniSparse(double** dataTotal, long* resultT ret.values = nullptr; return ret; } -minEval DecisionTree::incrementalMinGiniDense(double** data, long* result, long size, long col, long*** count, double** record, long* max, long newSize, long forgetSize, bool isRoot){ +minEval DecisionTree::incrementalMinGiniDense(double** data, long* result, long size, long col, long*** count, double** record, long* max, long newSize, long forgetSize, double** forgottenData, long* forgottenClass){ // newSize is before forget long low = 0; - if(isRoot)size=newSize-forgetSize; - long i, j, k; + //if(isRoot) + long i, j, k, tmp; long newMax = 0; long maxLocal = max[col]; long **newCount=(long**)malloc(size*sizeof(long*)); - for(i=0;icurrentMinMax){ - currentMinMax = record[col][j]; - for(k=0;k<=classes;k++)newCount[newMax][k]=count[col][j][k]; - } - } - for(j=0;j0){ + d = (long*)malloc(sizeof(long)*newMax); + for(i=0;i=newMax){ - updateCount[i] = count[col][i-newMax]; - updateRecord[i] = record[col][i-newMax]; + if(k==max[col]-newMax){ + updateCount[i] = newCount[j]; + updateRecord[i] = newRecord[j]; + j++; + } + else if(j==newMax){ + updateCount[i] = count[col][k]; + updateRecord[i] = record[col][k]; + k++; + } + else if(newRecord[j]>record[col][k]){ + updateCount[i] = count[col][k]; + updateRecord[i] = record[col][k]; + k++; } else{ - updateCount[i] = newCount[i]; - updateRecord[i] = newRecord[i]; + updateCount[i] = newCount[j]; + updateRecord[i] = newRecord[j]; + j++; } } free(count[col]); free(record[col]); count[col]=updateCount; record[col]=updateRecord; - } - for(i=newMax; isize==0){ + retain = size; + maxHeight = (long)log2((double)retain); + maxHeight = std::max(maxHeight, (long)1); Update(data, result, size, DTree); }else{ + if(forgetRate<=0){ + for(long j=0; jsize; i++){ + guesses[DTree->resultRecord[i]]++; + } + for(i=0; isize/size; + }*/ + if(localSm <= guessAcc){ + //if(localSm <= 1.0/classes){ + lastT = localT; + lastAll = localAll; + retain = size; + //increaseRate = 1.0-localSm; + } + else if(lastSm <= guessAcc){ + //else if(lastSm <= 1.0/classes){ + lastT = localT; + lastAll = localAll; + //forgetRate=-5.0; + retain += size; + //increaseRate -= localSm; + //increaseRate = initialIR; + //increaseRate -= localSm; + //increaseRate /= (double)localSm-1.0/classes; + } + else if(lastSm == localSm){ + lastT += localT; + lastAll += localAll; + retain+=(long)round(increaseRate*size); + //increaseRate*=increaseRate; + //retain = (long)((double)retain*isUp+0.25*size); + } + else{ + /*double lastSd = sqrt(pow((1.0-lastSm),2)*lastT+pow(lastSm,2)*(lastAll-lastT)/(lastAll-1)); + double localSd = sqrt(pow((1.0-localSm),2)*localT+pow(localSm,2)*(localAll-localT)/(localAll-1)); + double v = lastAll+localAll-2; + double sp = sqrt(((lastAll-1) * lastSd * lastSd + (localAll-1) * localSd * localSd) / v); + double q; + //double t=lastSm-localSm; + if(sp==0)q=1.0; + else if(lastAll+lastAll<2000){ + q = abs(lastSm-localSm); + } + else{ + double t = t/(sp*sqrt(1.0/lastAll+1.0/localAll)); + boost::math::students_t dist(v); + double c = cdf(dist, t); + q = cdf(complement(dist, fabs(t))); + }*/ + isUp = ((double)localSm-guessAcc)/((double)lastSm-guessAcc); + //isUp = ((double)localSm-1.0/classes)/((double)lastSm-1.0/classes); + increaseRate = increaseRate/isUp; + //increaseRate += increaseRate*factor; + if(isUp>=1.0)isUp=pow(isUp, 2); + else{ + isUp=pow(isUp, 3-isUp); + } + retain = std::min((long)round(retain*isUp+increaseRate*size), retain+size); + //double factor = ((lastSm-localSm)/localSm)*abs((lastSm-localSm)/localSm)*increaseRate; + //retain += std::min((long)round(factor*retain+increaseRate*size), size); + lastT = localT; + lastAll = localAll; + } + //printf(" %f, %f, %f\n", increaseRate, localSm, lastSm); + }else{ + long i; + retain = DTree->size+size; + /*double guessAcc=0.0; + long guesses[classes]; + for(i=0; isize; i++){ + guesses[DTree->resultRecord[i]]++; + } + for(i=0; isize/size; + }*/ + while(retain>=roundNo){ + if((double)localT/localAll>guessAcc){ + forgetRate+=5.0; + } + roundNo*=2; + } + if((double)localT/localAll<=guessAcc){ + forgetRate=-10.0; + } + if(forgetRate>=0){ + forgetRate=0.0; + } + lastT = localT; + lastAll = localAll; + } + } + //if(increaseRate>initialIR)increaseRate=initialIR; + //printf("%f\n", increaseRate); + if(retain0 and current->size+size>retain) forgetSize = std::min(current->size+size - retain, current->size); - else if(retain<0) forgetSize = (long)current->size*forgetRate; - long* index = new long[current->size]; + long* index; + bool forgetOld = false; + index = (long*)malloc(sizeof(long)*current->size); + if(current->size+size>retain and current->height==0) { + forgetSize = std::min(current->size+size - retain, current->size); + } + if(forgetSize==current->size){ + Update(data, result, size, current); + return; + } double** dataNew; long* resultNew; + double*** forgottenData = (double***)malloc(feature*sizeof(double**)); + long* forgottenClass = (long*)malloc(classes*sizeof(long)); + for(i=0;iheight == 0){ + for(i=0; isize-forgetSize)*sizeof(double*)); resultNew = (long*)malloc((size+current->size-forgetSize)*sizeof(long)); for(i=0;isize; i++){ index[i] = i; } - std::shuffle(index, index+current->size, g); + if(isRF)std::shuffle(index, index+current->size, g); long x = 0; for(i=0;isize;i++){ if(i>=current->size-forgetSize){ - current->dataRecord[index[i]][feature-1] = DBL_MAX; - + for(j=0; jresultRecord[index[i]]][forgottenClass[current->resultRecord[index[i]]]]=current->dataRecord[index[i]][j]; + } + forgottenClass[current->resultRecord[index[i]]]++; + current->dataRecord[index[i]][feature] = DBL_MAX; }else{ dataNew[i+size] = current->dataRecord[index[i]]; resultNew[i+size] = current->resultRecord[index[i]]; } } + for(i=0; isize)*sizeof(double*)); resultNew = (long*)malloc((size+current->size)*sizeof(long)); + long xxx[current->size]; for(i=0;isize;i++){ - if(current->dataRecord[i][feature-1] == DBL_MAX){ - forgetSize++; - continue; + if(current->dataRecord[i][feature] == DBL_MAX){ + xxx[forgetSize]=i; + forgetSize++; + forgottenClass[current->resultRecord[i]]++; }else{ dataNew[i+size-forgetSize] = current->dataRecord[i]; resultNew[i+size-forgetSize] = current->resultRecord[i]; } } + if(forgetSize==current->size){ + free(forgottenData); + free(forgottenClass); + if(size!=0){ + free(dataNew); + free(resultNew); + Update(data, result, size, current); + }else{ + // if a node have no new data and forget all old data, just keep old data + return; + } + return; + } + for(i=0; iresultRecord[tmp]][k[current->resultRecord[tmp]]]=current->dataRecord[tmp][j]; + } + k[current->resultRecord[tmp]]++; + } + free(k); + for(i=0; isize -= forgetSize; current->size += size; // end condition - if(current->terminate or roundNo%Rebuild==0){ + if(current->terminate or current->height==maxHeight or current->size==1){ + for(i=0;iheight == 0){ for(i=0; idataRecord[index[current->size-size+i]]); } } - delete(index); + free(index); + Update(dataNew, resultNew, current->size, current); + return; + }else if(size==0){ + for(i=0;isize, current); return; } @@ -525,24 +775,47 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT* long cFeature; cMin.eval = DBL_MAX; cMin.values = nullptr; - // TODO + long T[classes]; + double HY=0; + for(i=0;ifeatureId[i]]==1){ - c = incrementalMinGiniSparse(dataNew, resultNew, current->size+forgetSize, size, current, current->featureId[i], forgetSize, false); + long col = DTree->featureId[i]; + if(Sparse[col]==1){ + c = incrementalMinGiniSparse(dataNew, resultNew, current->size, size, current, col, forgetSize, forgottenData[col], forgottenClass); } - else if(Sparse[current->featureId[i]]==0){ - c = incrementalMinGiniDense(dataNew, resultNew, size, current->featureId[i], current->count, current->record, current->max, current->size+forgetSize, forgetSize, false); + else if(Sparse[col]==0){ + c = incrementalMinGiniDense(dataNew, resultNew, size, col, current->count, current->record, current->max, current->size, forgetSize, forgottenData[col], forgottenClass); }else{ //c = incrementalMinGiniCategorical(); } if(c.evalfeatureId[i]; - }else if(c.values!=nullptr)free(c.values); + cFeature = col; + } + } + for(i=0;iterminate = true; long t[classes]; @@ -550,27 +823,23 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT* t[i]=0; } for(i=low;iresult = std::distance(t, std::max_element(t, t+classes)); + free(index); + free(current->dataRecord); + free(current->resultRecord); + current->dataRecord = dataNew; + current->resultRecord = resultNew; return; } //diverse data long ptL=0, ptR=0; double* t; long currentSize = current->size; - //TODO:Discrete // Same diverse point as last time if(current->dpoint==cMin.value and current->feature==cFeature){ - long xxx = current->left->size; - /*for(i=0; ifeature]<=current->dpoint){ - ptL++; - }else{ - ptR++; - } - }*/ ptL = size; ptR = size; long* resultL = (long*)malloc((ptL)*sizeof(long)); @@ -598,7 +867,7 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT* free(current->dataRecord[index[current->size-size+i]]); } } - delete(index); + free(index); free(current->dataRecord); free(current->resultRecord); current->dataRecord = dataNew; @@ -636,21 +905,23 @@ void DecisionTree::IncrementalUpdate(double** data, long* result, long size, DT* Update(dataL, resultL, ptL, current->left); Update(dataR, resultR, ptR, current->right); + // TODO: free memeory if(current->height == 0){ for(i=0; idataRecord[index[current->size-size+i]]); } } - - delete(index); + free(index); free(current->dataRecord); free(current->resultRecord); current->dataRecord = dataNew; current->resultRecord = resultNew; } void DecisionTree::Update(double** data, long* result, long size, DT* current){ + if(not current->created)createNode(current, current->height, feature, classes); long low = 0; long i, j; + double HY = 0; // end condition if(current->dataRecord!=nullptr)free(current->dataRecord); current->dataRecord = data; @@ -663,7 +934,7 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){ for(i=0;iresult = std::distance(t, std::max_element(t, t+classes)); @@ -683,19 +954,25 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){ current->result = i; return; } + if(evalue == Evaluation::entropy){ + if(T[i]!=0)HY -= ((double)T[i]/size)*log2((double)T[i]/size); + }else{ + HY += pow(((double)T[i]/size), 2); + } } // find min Evaluation minEval c, cMin; long cFeature, oldMax, col, left=0; cMin.eval = DBL_MAX; cMin.values = nullptr; - //TODO + cFeature = -1; + //TODO: categorical for(i=0;ifeatureId[i]; - if(Sparse[current->featureId[i]]==1){ + col = DTree->featureId[i]; + if(Sparse[col]==1){ c = findMinGiniSparse(data, result, T, size, col, current); } - else if(Sparse[current->featureId[i]]==0){ + else if(Sparse[col]==0){ c = findMinGiniDense(data, result, T, size, col); if(current->count[col]!=nullptr){ for(j=0; jmax[col]; j++){ @@ -715,32 +992,37 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){ if(cMin.values!=nullptr)free(cMin.values); cMin.values = c.values; cMin.value = c.value; - cFeature = current->featureId[i]; + cFeature = col; left = c.left; }else if(c.values!=nullptr){ free(c.values); } } + if(cMin.eval == DBL_MAX){ current->terminate = true; long max = 0; + long maxs[classes]; + long count = 0; for(i=1;iresult = max; return; } + //printf(" %f\n", HY-cMin.eval); //diverse data current->terminate = false; current->feature = cFeature; current->dpoint = cMin.value; long ptL=0, ptR=0; - //TODO:Discrete - long* resultL = new long[left]; - long* resultR = new long[size-left]; - double** dataL = new double*[left]; - double** dataR = new double*[size-left]; + //TODO: categorical + long* resultL = new long[size]; + long* resultR = new long[size]; + double** dataL = new double*[size]; + double** dataR = new double*[size]; for(i=low; ifeature]<=current->dpoint){ dataL[ptL] = data[i]; @@ -757,14 +1039,12 @@ void DecisionTree::Update(double** data, long* result, long size, DT* current){ } long DecisionTree::Test(double* data, DT* root){ - if(root->terminate)return root->result; + if(root->terminate or root->height == maxHeight)return root->result; if(data[root->feature]<=root->dpoint)return Test(data, root->left); return Test(data, root->right); } void DecisionTree::print(DT* root){ - int x; - //std::cin>>x; if(root->terminate){ printf("%ld", root->result); return; diff --git a/sdk/irf.cpp b/sdk/irf.cpp index 0f9aac1..f1820d2 100644 --- a/sdk/irf.cpp +++ b/sdk/irf.cpp @@ -1,63 +1,65 @@ #include "DecisionTree.h" -#include "aquery.h" // __AQ_NO_SESSION__ #include "../server/table.h" +#include "aquery.h" -DecisionTree* dt = nullptr; +DecisionTree *dt = nullptr; -__AQEXPORT__(bool) newtree(int height, long f, ColRef sparse, double forget, long maxf, long noclasses, Evaluation e, long r, long rb){ - if(sparse.size!=f)return 0; - int* issparse = (int*)malloc(f*sizeof(int)); - for(long i=0; i sparse, double forget, long maxf, long noclasses, Evaluation e, long r, long rb) +{ + if (sparse.size != f) + return false; + int *issparse = (int *)malloc(f * sizeof(int)); + for (long i = 0; i < f; i++) issparse[i] = sparse.container[i]; - } - if(maxf<0)maxf=f; - dt = new DecisionTree(height, f, issparse, forget, maxf, noclasses, e, r, rb); - return 1; -} -__AQEXPORT__(bool) fit(ColRef> X, ColRef y){ - if(X.size != y.size)return 0; - double** data = (double**)malloc(X.size*sizeof(double*)); - long* result = (long*)malloc(y.size*sizeof(long)); - for(long i=0; i> v, vector_type res){ - double** data = (double**)malloc(v.size*sizeof(double*)); - for(int i = 0; i < v.size; ++i) + +// size_t pt = 0; +// __AQEXPORT__(bool) fit(ColRef> X, ColRef y){ +// if(X.size != y.size)return 0; +// double** data = (double**)malloc(X.size*sizeof(double*)); +// long* result = (long*)malloc(y.size*sizeof(long)); +// for(long i=0; i> v, vector_type res) +{ + double **data = (double **)malloc(v.size * sizeof(double *)); + for (int i = 0; i < v.size; ++i) data[i] = v.container[i].container; dt->fit(data, res.container, v.size); return true; } -__AQEXPORT__(vectortype_cstorage) predict(vector_type> v){ - int* result = (int*)malloc(v.size*sizeof(int)); - - for(long i=0; iTest(v.container[i].container, dt->DTree); - //printf("%d ", result[i]); - } - auto container = (vector_type*)malloc(sizeof(vector_type)); +__AQEXPORT__(vectortype_cstorage) +predict(vector_type> v) +{ + int *result = (int *)malloc(v.size * sizeof(int)); + + for (long i = 0; i < v.size; i++) + result[i] = dt->Test(v.container[i].container, dt->DTree); + + auto container = (vector_type *)malloc(sizeof(vector_type)); container->size = v.size; container->capacity = 0; container->container = result; - // container->out(10); - // ColRef>* col = (ColRef>*)malloc(sizeof(ColRef>)); auto ret = vectortype_cstorage{.container = container, .size = 1, .capacity = 0}; - // col->initfrom(ret, "sibal"); - // print(*col); return ret; - //return true; } - - diff --git a/server/libaquery.cpp b/server/libaquery.cpp index 500640d..7217dc7 100644 --- a/server/libaquery.cpp +++ b/server/libaquery.cpp @@ -31,12 +31,14 @@ void print<__int128_t>(const __int128_t& v, const char* delimiter){ s[40] = 0; std::cout<< get_int128str(v, s+40)<< delimiter; } + template <> void print<__uint128_t>(const __uint128_t&v, const char* delimiter){ char s[41]; s[40] = 0; std::cout<< get_uint128str(v, s+40) << delimiter; } + std::ostream& operator<<(std::ostream& os, __int128 & v) { print(v); @@ -76,6 +78,7 @@ char* intToString(T val, char* buf){ return buf; } + void skip(const char*& buf){ while(*buf && (*buf >'9' || *buf < '0')) buf++; } @@ -264,8 +267,8 @@ std::string base62uuid(int l) { static uniform_int_distribution u(0x10000, 0xfffff); uint64_t uuid = (u(engine) << 32ull) + (std::chrono::system_clock::now().time_since_epoch().count() & 0xffffffff); - //printf("%llu\n", uuid); - string ret; + + string ret; while (uuid && l-- >= 0) { ret = string("") + base62alp[uuid % 62] + ret; uuid /= 62; @@ -525,8 +528,8 @@ void ScratchSpace::cleanup(){ static_cast*>(temp_memory_fractions); if (vec_tmpmem_fractions->size) { for(auto& mem : *vec_tmpmem_fractions){ - free(mem); - //GC::gc_handle->reg(mem); + //free(mem); + GC::gc_handle->reg(mem); } vec_tmpmem_fractions->clear(); } diff --git a/server/libaquery.h b/server/libaquery.h index 7635310..39b25c3 100644 --- a/server/libaquery.h +++ b/server/libaquery.h @@ -66,23 +66,24 @@ struct Session{ void* memory_map; }; -struct StoredProcedure{ +struct StoredProcedure { uint32_t cnt, postproc_modules; char **queries; const char* name; void **__rt_loaded_modules; }; -struct Context{ + +struct Context { typedef int (*printf_type) (const char *format, ...); - void* module_function_maps = 0; + void* module_function_maps = nullptr; Config* cfg; int n_buffers, *sz_bufs; void **buffers; - void* alt_server = 0; + void* alt_server = nullptr; Log_level log_level = LOG_INFO; Session current; @@ -115,6 +116,12 @@ struct Context{ }; +struct StoredProcedurePayload { + StoredProcedure *p; + Context* cxt; +}; + +int execTriggerPayload(void*); #ifdef _WIN32 #define __DLLEXPORT__ __declspec(dllexport) __stdcall @@ -169,4 +176,79 @@ inline void AQ_ZeroMemory(_This_Struct& __val) { memset(&__val, 0, sizeof(_This_Struct)); } +#ifdef __USE_STD_SEMAPHORE__ + #include + class A_Semaphore { + private: + std::binary_semaphore native_handle; + public: + A_Semaphore(bool v = false) { + native_handle = std::binary_semaphore(v); + } + void acquire() { + native_handle.acquire(); + } + void release() { + native_handle.release(); + } + ~A_Semaphore() { } + }; +#else + #ifdef _WIN32 + class A_Semaphore { + private: + void* native_handle; + public: + A_Semaphore(bool); + void acquire(); + void release(); + ~A_Semaphore(); + }; + #else + #ifdef __APPLE__ + #include + class A_Semaphore { + private: + dispatch_semaphore_t native_handle; + public: + explicit A_Semaphore(bool v = false) { + native_handle = dispatch_semaphore_create(v); + } + void acquire() { + // puts("acquire"); + dispatch_semaphore_wait(native_handle, DISPATCH_TIME_FOREVER); + } + void release() { + // puts("release"); + dispatch_semaphore_signal(native_handle); + } + ~A_Semaphore() { + } + }; + #else + #include + class A_Semaphore { + private: + sem_t native_handle; + public: + A_Semaphore(bool v = false) { + sem_init(&native_handle, v, 1); + } + void acquire() { + sem_wait(&native_handle); + } + void release() { + sem_post(&native_handle); + } + ~A_Semaphore() { + sem_destroy(&native_handle); + } + }; + #endif // __APPLE__ + + #endif // _WIN32 +#endif //__USE_STD_SEMAPHORE__ + +void print_monetdb_results(void* _srv, const char* sep, const char* end, uint32_t limit); + #endif diff --git a/server/monetdb_conn.cpp b/server/monetdb_conn.cpp index 533a6f0..21380ea 100644 --- a/server/monetdb_conn.cpp +++ b/server/monetdb_conn.cpp @@ -5,9 +5,19 @@ #include #include "monetdb_conn.h" #include "monetdbe.h" + #include "table.h" #include +#ifdef _WIN32 + #include "winhelper.h" +#else + #include + #include + #include + #include +#endif // _WIN32 + #undef ERROR #undef static_assert @@ -215,3 +225,46 @@ bool Server::havehge() { return false; #endif } + + +void ExecuteStoredProcedureEx(const StoredProcedure *p, Context* cxt){ + auto server = static_cast(cxt->alt_server); + void* handle = nullptr; + uint32_t procedure_module_cursor = 0; + for(uint32_t i = 0; i < p->cnt; ++i) { + switch(p->queries[i][0]){ + case 'Q': { + server->exec(p->queries[i]); + } + break; + case 'P': { + auto c = code_snippet(dlsym(handle, p->queries[i]+1)); + c(cxt); + } + break; + case 'N': { + if(procedure_module_cursor < p->postproc_modules) + handle = p->__rt_loaded_modules[procedure_module_cursor++]; + } + break; + case 'O': { + uint32_t limit; + memcpy(&limit, p->queries[i] + 1, sizeof(uint32_t)); + if (limit == 0) + continue; + print_monetdb_results(server, " ", "\n", limit); + } + break; + default: + printf("Warning Q%u: unrecognized command %c.\n", + i, p->queries[i][0]); + } + } +} + +int execTriggerPayload(void* args) { + auto spp = (StoredProcedurePayload*)(args); + ExecuteStoredProcedureEx(spp->p, spp->cxt); + delete spp; + return 0; +} diff --git a/server/monetdb_conn.h b/server/monetdb_conn.h index 9894218..389ca63 100644 --- a/server/monetdb_conn.h +++ b/server/monetdb_conn.h @@ -24,7 +24,7 @@ struct Server{ static bool havehge(); void test(const char*); void print_results(const char* sep = " ", const char* end = "\n"); - friend void print_monetdb_results(Server* srv, const char* sep, const char* end, int limit); + friend void print_monetdb_results(void* _srv, const char* sep, const char* end, int limit); ~Server(); }; diff --git a/server/monetdb_ext.c b/server/monetdb_ext.c new file mode 100644 index 0000000..9a3b7d8 --- /dev/null +++ b/server/monetdb_ext.c @@ -0,0 +1,93 @@ +#include "monetdbe.h" +#include +#include "mal_client.h" +#include "sql_mvc.h" +#include "sql_semantic.h" +#include "mal_exception.h" + +typedef struct column_storage { + int refcnt; + int bid; + int ebid; /* extra bid */ + int uibid; /* bat with positions of updates */ + int uvbid; /* bat with values of updates */ + storage_type st; /* ST_DEFAULT, ST_DICT, ST_FOR */ + bool cleared; + bool merged; /* only merge changes once */ + size_t ucnt; /* number of updates */ + ulng ts; /* version timestamp */ +} column_storage; + +typedef struct segment { + BUN start; + BUN end; + bool deleted; /* we need to keep a dense segment set, 0 - end of last segemnt, + some segments maybe deleted */ + ulng ts; /* timestamp on this segment, ie tid of some active transaction or commit time of append/delete or + rollback time, ie ready for reuse */ + ulng oldts; /* keep previous ts, for rollbacks */ + struct segment *next; /* usualy one should be enough */ + struct segment *prev; /* used in destruction list */ +} segment; + +/* container structure to allow sharing this structure */ +typedef struct segments { + sql_ref r; + struct segment *h; + struct segment *t; +} segments; + +typedef struct storage { + column_storage cs; /* storage on disk */ + segments *segs; /* local used segements */ + struct storage *next; +} storage; + +typedef struct { + char language; /* 'S' or 's' or 'X' */ + char depth; /* depth >= 1 means no output for trans/schema statements */ + int remote; /* counter to make remote function names unique */ + mvc *mvc; + char others[]; +} backend; + +typedef struct { + Client c; + char *msg; + monetdbe_data_blob blob_null; + monetdbe_data_date date_null; + monetdbe_data_time time_null; + monetdbe_data_timestamp timestamp_null; + str mid; +} monetdbe_database_internal; + + +size_t +monetdbe_get_size(monetdbe_database dbhdl, const char* schema_name, const char *table_name) +{ + monetdbe_database_internal* hdl = (monetdbe_database_internal*)dbhdl; + backend* be = ((backend *)(((monetdbe_database_internal*)dbhdl)->c->sqlcontext)); + mvc *m = be->mvc; + mvc_trans(m); + sql_table *t = find_table_or_view_on_scope(m, NULL, schema_name, table_name, "CATALOG", false); + sql_column *col = ol_first_node(t->columns)->data; + sqlstore* store = m->store; + size_t sz = store->storage_api.count_col(m->session->tr, col, QUICK); + mvc_cancel_session(m); + return sz; +} + +void* +monetdbe_get_col(monetdbe_database dbhdl, const char* schema_name, const char *table_name, uint32_t col_id) { + monetdbe_database_internal* hdl = (monetdbe_database_internal*)dbhdl; + backend* be = ((backend *)(((monetdbe_database_internal*)dbhdl)->c->sqlcontext)); + mvc *m = be->mvc; + mvc_trans(m); + sql_table *t = find_table_or_view_on_scope(m, NULL, schema_name, table_name, "CATALOG", false); + sql_column *col = ol_fetch(t->columns, col_id); + sqlstore* store = m->store; + BAT *b = store->storage_api.bind_col(m->session->tr, col, RDONLY); + BATiter iter = bat_iterator(b); + mvc_cancel_session(m); + return iter.base; +} diff --git a/server/server.cpp b/server/server.cpp index 3fcbe9b..4438930 100644 --- a/server/server.cpp +++ b/server/server.cpp @@ -20,10 +20,6 @@ #include #include -// fast numeric to string conversion -#include "jeaiii_to_text.h" -#include "dragonbox/dragonbox_to_chars.h" - struct SharedMemory { std::atomic a; @@ -41,69 +37,7 @@ struct SharedMemory } }; -#ifndef __USE_STD_SEMAPHORE__ -#ifdef __APPLE__ -#include -class A_Semaphore { -private: - dispatch_semaphore_t native_handle; -public: - A_Semaphore(bool v = false) { - native_handle = dispatch_semaphore_create(v); - } - void acquire() { - // puts("acquire"); - dispatch_semaphore_wait(native_handle, DISPATCH_TIME_FOREVER); - } - void release() { - // puts("release"); - dispatch_semaphore_signal(native_handle); - } - ~A_Semaphore() { - } -}; -#else -#include -class A_Semaphore { -private: - sem_t native_handle; -public: - A_Semaphore(bool v = false) { - sem_init(&native_handle, v, 1); - } - void acquire() { - sem_wait(&native_handle); - } - void release() { - sem_post(&native_handle); - } - ~A_Semaphore() { - sem_destroy(&native_handle); - } -}; -#endif -#endif -#endif - -#ifdef __USE_STD_SEMAPHORE__ -#define __AQUERY_ITC_USE_SEMPH__ -#include -class A_Semaphore { -private: - std::binary_semaphore native_handle; -public: - A_Semaphore(bool v = false) { - native_handle = std::binary_semaphore(v); - } - void acquire() { - native_handle.acquire(); - } - void release() { - native_handle.release(); - } - ~A_Semaphore() { } -}; -#endif +#endif // _WIN32 #ifdef __AQUERY_ITC_USE_SEMPH__ A_Semaphore prompt{ true }, engine{ false }; @@ -225,9 +159,10 @@ inline constexpr static unsigned char monetdbe_type_szs[] = { 1 }; constexpr uint32_t output_buffer_size = 65536; -void print_monetdb_results(Server* srv, const char* sep = " ", const char* end = "\n", +void print_monetdb_results(void* _srv, const char* sep = " ", const char* end = "\n", uint32_t limit = std::numeric_limits::max()) { - if (!srv->haserror() && srv->cnt && limit){ + auto srv = static_cast(_srv); + if (!srv->haserror() && srv->cnt && limit) { char buffer[output_buffer_size]; auto _res = static_cast (srv->res); const auto ncols = _res->ncols; @@ -255,7 +190,7 @@ void print_monetdb_results(Server* srv, const char* sep = " ", const char* end = puts("Error: separator or end string too long"); goto cleanup; } - if (header_string.size() - l_sep - 1>= 0) + if (header_string.size() >= l_sep + 1) header_string.resize(header_string.size() - l_sep - 1); header_string += end + std::string(header_string.size(), '=') + end; fputs(header_string.c_str(), stdout); @@ -324,7 +259,7 @@ int dll_main(int argc, char** argv, Context* cxt){ catch (std::filesystem::filesystem_error& e) { printf("Failed to create directory %s: %s\n", procedure_root.c_str(), e.what()); } - + if (cxt->module_function_maps == nullptr) cxt->module_function_maps = new std::unordered_map(); auto module_fn_map = @@ -590,9 +525,9 @@ start: break; case 'D': // delete procedure break; - case 'S': //save procedure + case 'S': // save procedure break; - case 'L': //load procedure + case 'L': // load procedure if (!load_proc_fromfile(current_procedure)) { cxt->stored_proc.insert_or_assign(proc_name, current_procedure); } diff --git a/server/tests/thread_pool.hpp b/server/tests/thread_pool.hpp index dfed503..961268e 100644 --- a/server/tests/thread_pool.hpp +++ b/server/tests/thread_pool.hpp @@ -4,21 +4,26 @@ #include #include #include -using namespace std; FILE *fp; long long testing_throughput(uint32_t n_jobs, bool prompt = true){ + using namespace std::chrono_literals; printf("Threadpool througput test with %u jobs. Press any key to start.\n", n_jobs); - auto tp = ThreadPool(thread::hardware_concurrency()); + auto tp = ThreadPool(std::thread::hardware_concurrency()); getchar(); auto i = 0u; fp = fopen("tmp.tmp", "wb"); - auto time = chrono::high_resolution_clock::now(); - while(i++ < n_jobs) tp.enqueue_task({ [](void* f) {fprintf(fp, "%d ", *(int*)f); free(f); }, new int(i) }); + auto time = std::chrono::high_resolution_clock::now(); + while(i++ < n_jobs) { + payload_t payload; + payload.f = [](void* f) {fprintf(fp, "%d ", *(int*)f); free(f); return 0; }; + payload.args = new int(i); + tp.enqueue_task(payload); + } puts("done dispatching."); - while (tp.busy()) this_thread::sleep_for(1s); - auto t = (chrono::high_resolution_clock::now() - time).count(); + while (tp.busy()) std::this_thread::sleep_for(1s); + auto t = (std::chrono::high_resolution_clock::now() - time).count(); printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", i, t, i/(double)(t)); //this_thread::sleep_for(2s); fclose(fp); @@ -27,26 +32,31 @@ long long testing_throughput(uint32_t n_jobs, bool prompt = true){ long long testing_transaction(uint32_t n_burst, uint32_t n_batch, uint32_t base_time, uint32_t var_time, bool prompt = true, FILE* _fp = stdout){ + using namespace std::chrono_literals; printf("Threadpool transaction test: burst: %u, batch: %u, time: [%u, %u].\n" , n_burst, n_batch, base_time, var_time + base_time); if (prompt) { puts("Press any key to start."); getchar(); } - auto tp = ThreadPool(thread::hardware_concurrency()); + auto tp = ThreadPool(std::thread::hardware_concurrency()); fp = _fp; auto i = 0u, j = 0u; - auto time = chrono::high_resolution_clock::now(); + auto time = std::chrono::high_resolution_clock::now(); while(j++ < n_batch){ i = 0u; - while(i++ < n_burst) - tp.enqueue_task({ [](void* f) { fprintf(fp, "%d ", *(int*)f); free(f); }, new int(j) }); + while(i++ < n_burst) { + payload_t payload; + payload.f = [](void* f) { fprintf(fp, "%d ", *(int*)f); free(f); return 0; }; + payload.args = new int(j); + tp.enqueue_task(payload); + } fflush(stdout); - this_thread::sleep_for(chrono::microseconds(rand()%var_time + base_time)); + std::this_thread::sleep_for(std::chrono::microseconds(rand()%var_time + base_time)); } puts("done dispatching."); - while (tp.busy()) this_thread::sleep_for(1s); - auto t = (chrono::high_resolution_clock::now() - time).count(); + while (tp.busy()) std::this_thread::sleep_for(1s); + auto t = (std::chrono::high_resolution_clock::now() - time).count(); printf("\nTr: %u, Ti: %lld \nThroughput: %lf transactions/ns\n", j*i, t, j*i/(double)(t)); return t; @@ -58,17 +68,17 @@ long long testing_destruction(bool prompt = true){ puts("Press any key to start."); getchar(); } - auto time = chrono::high_resolution_clock::now(); + auto time = std::chrono::high_resolution_clock::now(); for(int i = 0; i < 8; ++i) testing_transaction(0xfff, 0xff, 400, 100, false, fp); for(int i = 0; i < 64; ++i) testing_transaction(0xff, 0xf, 60, 20, false, fp); for(int i = 0; i < 1024; ++i) { - auto tp = new ThreadPool(256); + auto tp = new ThreadPool(255); delete tp; } return 0; - auto t = (chrono::high_resolution_clock::now() - time).count(); + auto t = (std::chrono::high_resolution_clock::now() - time).count(); fclose(fp); return t; } diff --git a/server/threading.cpp b/server/threading.cpp index 3f641fc..cf61be9 100644 --- a/server/threading.cpp +++ b/server/threading.cpp @@ -1,4 +1,5 @@ #include "threading.h" +#include "libaquery.h" #include #include #include @@ -105,7 +106,8 @@ ThreadPool::ThreadPool(uint32_t n_threads) current_payload = new payload_t[n_threads]; for (uint32_t i = 0; i < n_threads; ++i){ - atomic_init(tf + i, static_cast(0b10)); + // atomic_init(tf + i, static_cast(0b10)); + tf[i] = static_cast(0b10); th[i] = thread(&ThreadPool::daemon_proc, this, i); } @@ -152,21 +154,50 @@ bool ThreadPool::busy(){ return true; } -Trigger::Trigger(ThreadPool* tp){ - +IntervalBasedTriggerHost::IntervalBasedTriggerHost(ThreadPool* tp){ + this->tp = tp; + this->triggers = new vector_type; + adding_trigger = new mutex(); + this->now = std::chrono::high_resolution_clock::now().time_since_epoch().count(); } -void IntervalBasedTrigger::timer::reset(){ - time_remaining = interval; +void IntervalBasedTriggerHost::add_trigger(StoredProcedure *p, uint32_t interval) { + auto tr = IntervalBasedTrigger{.interval = interval, .time_remaining = 0, .sp = p}; + auto vt_triggers = static_cast *>(this->triggers); + adding_trigger->lock(); + vt_triggers->emplace_back(tr); + adding_trigger->unlock(); } -bool IntervalBasedTrigger::timer::tick(uint32_t t){ - if (time_remaining > t) { - time_remaining -= t; - return false; - } - else{ - time_remaining = interval - t%interval; - return true; +void IntervalBasedTriggerHost::tick() { + const auto current_time = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + const auto delta_t = static_cast((current_time - now) / 1000000); // miliseconds precision + now = current_time; + auto vt_triggers = static_cast *>(this->triggers); + adding_trigger->lock(); + for(auto& t : *vt_triggers) { + if(t.tick(delta_t)) { + payload_t payload; + payload.f = execTriggerPayload; + payload.args = static_cast(new StoredProcedurePayload {t.sp, cxt}); + + tp->enqueue_task(payload); + } } -} \ No newline at end of file + adding_trigger->unlock(); +} + +void IntervalBasedTrigger::reset() { + time_remaining = interval; +} + +bool IntervalBasedTrigger::tick(uint32_t delta_t) { + bool ret = false; + if (time_remaining <= delta_t) + ret = true; + if (auto curr_dt = delta_t % interval; time_remaining <= curr_dt) + time_remaining = interval + time_remaining - curr_dt; + else + time_remaining = time_remaining - curr_dt; + return ret; +} diff --git a/server/threading.h b/server/threading.h index 1144de7..94277a7 100644 --- a/server/threading.h +++ b/server/threading.h @@ -7,7 +7,7 @@ typedef int(*payload_fn_t)(void*); struct payload_t{ payload_fn_t f; void* args; - constexpr payload_t(payload_fn_t f, void* args) noexcept + constexpr payload_t(payload_fn_t f, void* args) noexcept : f(f), args(args) {} constexpr payload_t() noexcept : f(nullptr), args(nullptr) {}; @@ -19,7 +19,7 @@ struct payload_t{ class ThreadPool{ public: - ThreadPool(uint32_t n_threads = 0); + explicit ThreadPool(uint32_t n_threads = 0); void enqueue_task(const payload_t& payload); bool busy(); virtual ~ThreadPool(); @@ -39,29 +39,44 @@ private: }; -class Trigger{ -private: - void* triggers; //min-heap by t-rem - virtual void tick() = 0; +#include +#include +class A_Semphore; +class TriggerHost { +protected: + void* triggers; + std::thread* handle; + ThreadPool *tp; + Context* cxt; + std::mutex* adding_trigger; + + virtual void tick() = 0; public: - Trigger(ThreadPool* tp); + TriggerHost() = default; + virtual ~TriggerHost() = default; }; -class IntervalBasedTrigger : public Trigger{ +struct StoredProcedure; + +struct IntervalBasedTrigger { + uint32_t interval; // in milliseconds + uint32_t time_remaining; + StoredProcedure* sp; + void reset(); + bool tick(uint32_t t); +}; + +class IntervalBasedTriggerHost : public TriggerHost { public: - struct timer{ - uint32_t interval; // in milliseconds - uint32_t time_remaining; - void reset(); - bool tick(uint32_t t); - }; - void add_trigger(); + explicit IntervalBasedTriggerHost(ThreadPool *tp); + void add_trigger(StoredProcedure* stored_procedure, uint32_t interval); private: + unsigned long long now; void tick() override; }; -class CallbackBasedTrigger : public Trigger{ +class CallbackBasedTriggerHost : public TriggerHost { public: void add_trigger(); private: diff --git a/server/winhelper.h b/server/winhelper.h index 463c8bc..7392b47 100644 --- a/server/winhelper.h +++ b/server/winhelper.h @@ -16,18 +16,6 @@ struct SharedMemory void FreeMemoryMap(); }; -#ifndef __USE_STD_SEMAPHORE__ -class A_Semaphore { -private: - void* native_handle; -public: - A_Semaphore(bool); - void acquire(); - void release(); - ~A_Semaphore(); -}; -#endif - #endif // WIN32 #endif // WINHELPER diff --git a/tests/dt2.a b/tests/dt2.a index bd2bde7..5f7fd2a 100644 --- a/tests/dt2.a +++ b/tests/dt2.a @@ -5,18 +5,18 @@ LOAD MODULE FROM "./libirf.so" predict(X:vecvecdouble) -> vecint ); - create table source(x1 double, x2 double, x3 double, x4 double, x5 int64); - load data infile "data/benchmark" into table source fields terminated by ","; +create table source(x1 double, x2 double, x3 double, x4 double, x5 int64); +load data infile "data/benchmark" into table source fields terminated by ","; - create table sparse(x int); - insert into sparse values (1); - insert into sparse values (1); - insert into sparse values (1); - insert into sparse values (1); +create table sparse(x int); +insert into sparse values (1); +insert into sparse values (1); +insert into sparse values (1); +insert into sparse values (1); - select newtree(6, 4, sparse.x, 0, 4, 2, 0, 400, 2147483647) from sparse +select newtree(6, 4, sparse.x, 0, 4, 2, 0, 400, 2147483647) from sparse - select fit(pack(x1, x2, x3, x4), x5) from source +select fit(pack(x1, x2, x3, x4), x5) from source -- select pack(x1, x2, x3, x4) from source - select predict(pack(x1, x2, x3, x4)) from source \ No newline at end of file +select predict(pack(x1, x2, x3, x4)) from source \ No newline at end of file