dev
Bill Sun 3 years ago
parent 634ed382f5
commit 824db2a43b

@ -7,6 +7,8 @@ $(info $(OS_SUPPORT))
server.bin:
g++ server/server.cpp $(OS_SUPPORT) --std=c++1z -O3 -march=native -o server.bin
server.so:
g++ server/server.cpp -shared $(OS_SUPPORT) --std=c++1z -O3 -march=native -o server.so
snippet:
g++ -shared -fPIC --std=c++1z out.cpp -O3 -march=native -o dll.so
clean:

@ -1,6 +1,6 @@
#include "./server/libaquery.h"
#include "./server/aggregations.h"
#include "csv.h"
#include "./server/libaquery.h"
#include "./server/hasher.h"
#include <unordered_map>
@ -18,40 +18,40 @@ test_a.init("a");
test_b.init("b");
test_c.init("c");
test_d.init("d");
io::CSVReader<4> csv_reader_307VD4("test.csv");
csv_reader_307VD4.read_header(io::ignore_extra_column, "a","b","c","d");
int tmp_3LXIYQmp;
int tmp_1m5NCKR4;
int tmp_10LZcLgy;
int tmp_39pPZL8W;
while(csv_reader_307VD4.read_row(tmp_3LXIYQmp,tmp_1m5NCKR4,tmp_10LZcLgy,tmp_39pPZL8W)) {
io::CSVReader<4> csv_reader_1qh80y("test.csv");
csv_reader_1qh80y.read_header(io::ignore_extra_column, "a","b","c","d");
int tmp_6JfuovoZ;
int tmp_4B4ADRgW;
int tmp_3JHd3elW;
int tmp_1heR8kZw;
while(csv_reader_1qh80y.read_row(tmp_6JfuovoZ,tmp_4B4ADRgW,tmp_3JHd3elW,tmp_1heR8kZw)) {
test_a.emplace_back(tmp_3LXIYQmp);
test_b.emplace_back(tmp_1m5NCKR4);
test_c.emplace_back(tmp_10LZcLgy);
test_d.emplace_back(tmp_39pPZL8W);
test_a.emplace_back(tmp_6JfuovoZ);
test_b.emplace_back(tmp_4B4ADRgW);
test_c.emplace_back(tmp_3JHd3elW);
test_d.emplace_back(tmp_1heR8kZw);
}
typedef record<decltype(test_a[0]),decltype(test_b[0]),decltype(test_d[0])> record_type3OMslKw;
unordered_map<record_type3OMslKw, vector_type<uint32_t>, transTypes<record_type3OMslKw, hasher>> g7LNVAss;
for (uint32_t i1T = 0; i1T < test_a.size; ++i1T){
g7LNVAss[forward_as_tuple(test_a[i1T],test_b[i1T],test_d[i1T])].emplace_back(i1T);
typedef record<decltype(test_a[0]),decltype(test_b[0]),decltype(test_d[0])> record_type3he5qd1;
unordered_map<record_type3he5qd1, vector_type<uint32_t>, transTypes<record_type3he5qd1, hasher>> g59vWI2v;
for (uint32_t i3P = 0; i3P < test_a.size; ++i3P){
g59vWI2v[forward_as_tuple(test_a[i3P],test_b[i3P],test_d[i3P])].emplace_back(i3P);
}
auto out_HSfK = new TableInfo<decays<decltype(sum(test_c))>,value_type<decays<decltype(test_b)>>,value_type<decays<decltype(test_d)>>>("out_HSfK", 3);
cxt->tables.insert({"out_HSfK", out_HSfK});
auto& out_HSfK_sumtestc = *(ColRef<decays<decltype(sum(test_c))>> *)(&out_HSfK->colrefs[0]);
auto& out_HSfK_b = *(ColRef<value_type<decays<decltype(test_b)>>> *)(&out_HSfK->colrefs[1]);
auto& out_HSfK_d = *(ColRef<value_type<decays<decltype(test_d)>>> *)(&out_HSfK->colrefs[2]);
out_HSfK_sumtestc.init("sumtestc");
out_HSfK_b.init("b");
out_HSfK_d.init("d");
for(auto& i18 : g7LNVAss) {
auto &key_3s5slnK = i18.first;
auto &val_2nNLv0D = i18.second;
out_HSfK_sumtestc.emplace_back(sum(test_c[val_2nNLv0D]));
out_HSfK_b.emplace_back(get<1>(key_3s5slnK));
out_HSfK_d.emplace_back(get<2>(key_3s5slnK));
auto out_6JAp = new TableInfo<decays<decltype(sum(test_c))>,value_type<decays<decltype(test_b)>>,value_type<decays<decltype(test_d)>>>("out_6JAp", 3);
cxt->tables.insert({"out_6JAp", out_6JAp});
auto& out_6JAp_sumtestc = *(ColRef<decays<decltype(sum(test_c))>> *)(&out_6JAp->colrefs[0]);
auto& out_6JAp_b = *(ColRef<value_type<decays<decltype(test_b)>>> *)(&out_6JAp->colrefs[1]);
auto& out_6JAp_d = *(ColRef<value_type<decays<decltype(test_d)>>> *)(&out_6JAp->colrefs[2]);
out_6JAp_sumtestc.init("sumtestc");
out_6JAp_b.init("b");
out_6JAp_d.init("d");
for(auto& i2Y : g59vWI2v) {
auto &key_1yBYhdd = i2Y.first;
auto &val_61QXy6G = i2Y.second;
out_6JAp_sumtestc.emplace_back(sum(test_c[val_61QXy6G]));
out_6JAp_b.emplace_back(get<1>(key_1yBYhdd));
out_6JAp_d.emplace_back(get<2>(key_1yBYhdd));
}
auto d5b7C95U = out_HSfK->order_by_view<-3,1>();
print(d5b7C95U);
auto d1dyPtv0 = out_6JAp->order_by_view<-3,1>();
print(d1dyPtv0);
return 0;
}

@ -1,3 +1,4 @@
import enum
import re
import time
import dbconn
@ -12,14 +13,26 @@ import sys
import os
from engine.utils import base62uuid
import atexit
import threading
import ctypes
class RunType(enum.Enum):
Threaded = 0
IPC = 1
server_mode = RunType.Threaded
server_bin = 'server.bin' if server_mode == RunType.IPC else 'server.so'
try:
os.remove('server.bin')
os.remove(server_bin)
except Exception as e:
print(type(e), e)
nullstream = open(os.devnull, 'w')
subprocess.call(['make', 'server.bin'], stdout=nullstream)
subprocess.call(['make', server_bin], stdout=nullstream)
cleanup = True
def rm():
@ -45,9 +58,7 @@ def rm():
cleanup = False
nullstream.close()
atexit.register(rm)
def init():
def init_ipc():
global shm, server, basecmd, mm
shm = base62uuid()
if sys.platform != 'win32':
@ -70,6 +81,100 @@ def init():
mm.flush()
server = subprocess.Popen(["./server.bin", shm])
import numpy as np
c = lambda _ba: ctypes.cast((ctypes.c_char * len(_ba)).from_buffer(_ba), ctypes.c_char_p)
class Config:
def __init__(self, nq = 0, mode = server_mode, n_bufs = 0, bf_szs = []) -> None:
self.int_size = 4
self.n_attrib = 4
self.buf = bytearray((self.n_attrib + n_bufs) * self.int_size)
self.np_buf = np.ndarray(shape=(self.n_attrib), buffer=self.buf, dtype=np.int32)
self.new_query = nq
self.server_mode = mode.value
self.running = 1
self.n_buffers = n_bufs
@property
def running(self):
return self.np_buf[0]
@running.setter
def running(self, rn):
self.np_buf[0] = rn
@property
def new_query(self):
return self.np_buf[1]
@new_query.setter
def new_query(self, nq):
self.np_buf[1] = nq
@property
def server_mode(self):
return self.np_buf[2]
@server_mode.setter
def server_mode(self, mode):
self.np_buf[2] = mode
@property
def n_buffers(self):
return self.np_buf[3]
@n_buffers.setter
def n_buffers(self, n_bufs):
self.np_buf[3] = n_bufs
def set_bufszs(self, buf_szs):
for i in range(min(len(buf_szs), self.n_buffers)):
self.np_buf[i+self.n_attrib] = buf_szs[i]
@property
def c(self):
return c(self.buf)
cfg = Config()
th = None
def init_threaded():
if os.name == 'nt':
t = os.environ['PATH'].lower().split(';')
vars = re.compile('%.*%')
for e in t:
if(len(e) != 0):
if '%' in e:
try:
m_e = vars.findall(e)
for m in m_e:
e = e.replace(m, os.environ[m[1:-1]])
# print(m, e)
except Exception:
continue
os.add_dll_directory(e)
server_so = ctypes.CDLL('./'+server_bin)
global cfg, th
th = threading.Thread(target=server_so['main'], args=(-1, ctypes.POINTER(ctypes.c_char_p)(cfg.c)), daemon=True)
th.start()
if server_mode == RunType.IPC:
atexit.register(rm)
init = init_ipc
set_ready = lambda : mm.seek(0,os.SEEK_SET) or mm.write(b'\x01\x01')
def __get_ready():
mm.seek(0,os.SEEK_SET)
return mm.read(2)[1]
get_ready = __get_ready
server_status = lambda : server.poll() is not None
else:
init = init_threaded
rm = lambda: None
def __set_ready():
global cfg
cfg.new_query = 1
set_ready = __set_ready
get_ready = lambda:cfg.new_query
server_status = lambda : not th.is_alive()
init()
test_parser = True
@ -82,31 +187,16 @@ q = 'SELECT p.Name, v.Name FROM Production.Product p JOIN Purchasing.ProductVend
res = parser.parse(q)
# else:f
# if subprocess.call(['make', 'snippet']) == 0:
# mm.seek(0)
# mm.write(b'\x01\x01')
# time.sleep(.1)
# mm.seek(0)
# print(mm.read(2))
# mm.close()
# handle.close()
# os.remove(shm)
# exit()
keep = True
cxt = engine.initialize()
cxt.Info(res)
while test_parser:
try:
if server.poll() is not None:
if server_status():
init()
print("> ", end="")
ready = 1
while ready == 1:
mm.seek(0,os.SEEK_SET)
ready = mm.read(2)[1]
while get_ready():
time.sleep(.00001)
print("> ", end="")
q = input().lower()
if q == 'exec':
if not keep or cxt is None:
@ -123,8 +213,7 @@ while test_parser:
with open('out.cpp', 'wb') as outfile:
outfile.write((cxt.finalize()).encode('utf-8'))
if subprocess.call(['make', 'snippet'], stdout = nullstream) == 0:
mm.seek(0,os.SEEK_SET)
mm.write(b'\x01\x01')
set_ready()
continue
if q == 'xexec':
cxt = xengine.initialize()
@ -159,8 +248,10 @@ while test_parser:
break
elif q == 'r':
if subprocess.call(['make', 'snippet']) == 0:
mm.seek(0,os.SEEK_SET)
mm.write(b'\x01\x01')
set_ready()
continue
elif q == 'rr':
set_ready()
continue
elif q.startswith('save'):
filename = re.split(' |\t', q)

Binary file not shown.

@ -1,27 +0,0 @@
{
"BackgroundImageAbsolutePath": "c:\\users\\bill\\appdata\\local\\microsoft\\visualstudio\\17.0_03c65567\\extensions\\atkxhose.05t\\Images\\background.png",
"BackgroundImagesDirectoryAbsolutePath": "c:\\users\\bill\\appdata\\local\\microsoft\\visualstudio\\17.0_03c65567\\extensions\\atkxhose.05t\\Images",
"ExpandToIDE": false,
"Extensions": ".png, .jpg, .gif, .bmp",
"ImageBackgroundType": 0,
"ImageFadeAnimationInterval": "PT5S",
"ImageStretch": 0,
"IsLimitToMainlyEditorWindow": false,
"LoopSlideshow": true,
"MaxHeight": 0,
"MaxWidth": 0,
"Opacity": 0.35,
"PositionHorizon": 1,
"PositionVertical": 1,
"ShuffleSlideshow": false,
"SoftEdgeX": 0,
"SoftEdgeY": 0,
"TileMode": 0,
"UpdateImageInterval": "PT1M",
"ViewBoxPointX": 0,
"ViewBoxPointY": 0,
"ViewPortHeight": 1,
"ViewPortPointX": 0,
"ViewPortPointY": 0,
"ViewPortWidth": 1
}

@ -10,10 +10,21 @@ enum Log_level {
LOG_SILENT
};
struct Config{
int running, new_query, server_mode, n_buffers;
int buffer_sizes[];
};
struct Context{
typedef int (*printf_type) (const char *format, ...);
std::unordered_map<const char*, void*> tables;
std::unordered_map<const char*, uColRef *> cols;
Config* cfg;
int n_buffers, *sz_bufs;
void **buffers;
Log_level log_level = LOG_SILENT;
printf_type print = printf;
template <class ...Types>
@ -28,7 +39,7 @@ struct Context{
}
};
#ifdef _MSC_VER
#ifdef _WIN32
#define __DLLEXPORT__ __declspec(dllexport) __stdcall
#else
#define __DLLEXPORT__

@ -36,13 +36,43 @@ void daemon(thread_context* c) {
#include "aggregations.h"
typedef int (*code_snippet)(void*);
int _main();
int main(int argc, char** argv) {
int dll_main(int argc, char** argv, Context* cxt){
Config *cfg = reinterpret_cast<Config *>(argv[0]);
auto buf_szs = cfg->buffer_sizes;
void** buffers = (void**)malloc(sizeof(void*) * cfg->n_buffers);
for (int i = 0; i < cfg->n_buffers; i++)
buffers[i] = static_cast<void *>(argv[i + 1]);
cxt->buffers = buffers;
cxt->cfg = cfg;
cxt->n_buffers = cfg->n_buffers;
cxt->sz_bufs = buf_szs;
while(cfg->running){
if (cfg->new_query) {
void* handle = dlopen("d:/gg/Aquery++/dll.so", RTLD_LAZY);
code_snippet c = reinterpret_cast<code_snippet>(dlsym(handle, "dllmain"));
c(cxt);
dlclose(handle);
cfg->new_query = 0;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return 0;
}
extern "C" int __DLLEXPORT__ main(int argc, char** argv) {
Context* cxt = new Context();
cxt->log("%d %s\n", argc, argv[1]);
const char* shmname;
if (argc <= 1)
return _main();
if (argc < 0)
return dll_main(argc, argv, cxt);
else if (argc <= 1)
return test_main();
else
shmname = argv[1];
SharedMemory shm = SharedMemory(shmname);
@ -68,7 +98,6 @@ int main(int argc, char** argv) {
cxt->log("inner\n");
cxt->err("return: %d\n", c(cxt));
}
dlclose(handle);
}
ready = false;
}
@ -77,7 +106,7 @@ int main(int argc, char** argv) {
return 0;
}
#include "utils.h"
int _main()
int test_main()
{
//vector_type<int> t;
//t = 1;

Loading…
Cancel
Save