master
Bill 2 years ago
parent 906daf577b
commit b60bfc478d

2
.gitignore vendored

@ -87,3 +87,5 @@ udf*.hpp
*.ipynb
saved_procedures/**
procedures/**
.mypy_cache
__pycache__

@ -10,7 +10,7 @@ RUN export OS_VER=`cat /etc/os-release | grep VERSION_CODENAME` &&\
RUN wget --output-document=/etc/apt/trusted.gpg.d/monetdb.gpg https://dev.monetdb.org/downloads/MonetDB-GPG-KEY.gpg
RUN apt update && apt install -y python3 python3-pip clang-14 libmonetdbe-dev git
RUN apt update && apt install -y python3 python3-pip clang-14 libmonetdbe-dev libmonetdb-client-dev monetdb5-sql-dev git
RUN git clone https://github.com/sunyinqi0508/AQuery2

@ -9,11 +9,11 @@ use_threadpool = False # True
# environments
input_prefix = f'data{sep}{dataset}_orig'
output_prefix = f'data{sep}{dataset}'
sep_field = ','
sep_subfield = ';'
sep_field = b','
sep_subfield = b';'
lst_files = os.listdir(input_prefix)
lst_files.sort()
# lst_files.sort()
try:
os.mkdir(output_prefix)
@ -24,15 +24,15 @@ def process(f : str):
filename = input_prefix + sep + f
ofilename = output_prefix + sep + f[:-3] + 'csv'
with open(filename, 'rb') as ifile:
icontents = ifile.read().decode('utf-8')
icontents = ifile.read()
with open(ofilename, 'wb') as ofile:
for l in icontents.splitlines():
fields = l.strip().split(' ')
fields = l.strip().split(b' ')
subfields = fields[:-1]
ol = ( # fields[0] + sep_field +
sep_subfield.join(subfields) +
sep_field + fields[-1] + '\n')
ofile.write(ol.encode('utf-8'))
sep_field + fields[-1] + b'\n')
ofile.write(ol)
if not use_threadpool:
for f in lst_files:

@ -34,4 +34,9 @@ struct monetdbe_table_data{
void* cols;
};
size_t
monetdbe_get_size(void* dbhdl, const char *table_name);
void*
monetdbe_get_col(void* dbhdl, const char *table_name, uint32_t col_id);
#endif

@ -1,3 +1,5 @@
// Non-standard Extensions for MonetDBe, may break concurrency control!
#include "monetdbe.h"
#include <stdint.h>
#include "mal_client.h"
@ -61,33 +63,28 @@ typedef struct {
str mid;
} monetdbe_database_internal;
size_t
monetdbe_get_size(monetdbe_database dbhdl, const char* schema_name, const char *table_name)
monetdbe_get_size(monetdbe_database dbhdl, const char *table_name)
{
monetdbe_database_internal* hdl = (monetdbe_database_internal*)dbhdl;
backend* be = ((backend *)(((monetdbe_database_internal*)dbhdl)->c->sqlcontext));
mvc *m = be->mvc;
mvc_trans(m);
sql_table *t = find_table_or_view_on_scope(m, NULL, schema_name, table_name, "CATALOG", false);
sql_table *t = find_table_or_view_on_scope(m, NULL, "sys", table_name, "CATALOG", false);
sql_column *col = ol_first_node(t->columns)->data;
sqlstore* store = m->store;
size_t sz = store->storage_api.count_col(m->session->tr, col, QUICK);
mvc_cancel_session(m);
return sz;
}
void*
monetdbe_get_col(monetdbe_database dbhdl, const char* schema_name, const char *table_name, uint32_t col_id) {
monetdbe_get_col(monetdbe_database dbhdl, const char *table_name, uint32_t col_id) {
monetdbe_database_internal* hdl = (monetdbe_database_internal*)dbhdl;
backend* be = ((backend *)(((monetdbe_database_internal*)dbhdl)->c->sqlcontext));
mvc *m = be->mvc;
mvc_trans(m);
sql_table *t = find_table_or_view_on_scope(m, NULL, schema_name, table_name, "CATALOG", false);
sql_table *t = find_table_or_view_on_scope(m, NULL, "sys", table_name, "CATALOG", false);
sql_column *col = ol_fetch(t->columns, col_id);
sqlstore* store = m->store;
BAT *b = store->storage_api.bind_col(m->session->tr, col, RDONLY);
BAT *b = store->storage_api.bind_col(m->session->tr, col, QUICK);
BATiter iter = bat_iterator(b);
mvc_cancel_session(m);
return iter.base;
}

@ -157,16 +157,16 @@ bool ThreadPool::busy(){
IntervalBasedTriggerHost::IntervalBasedTriggerHost(ThreadPool* tp){
this->tp = tp;
this->triggers = new vector_type<IntervalBasedTrigger>;
adding_trigger = new mutex();
trigger_queue_lock = new mutex();
this->now = std::chrono::high_resolution_clock::now().time_since_epoch().count();
}
void IntervalBasedTriggerHost::add_trigger(StoredProcedure *p, uint32_t interval) {
auto tr = IntervalBasedTrigger{.interval = interval, .time_remaining = 0, .sp = p};
auto vt_triggers = static_cast<vector_type<IntervalBasedTrigger> *>(this->triggers);
adding_trigger->lock();
trigger_queue_lock->lock();
vt_triggers->emplace_back(tr);
adding_trigger->unlock();
trigger_queue_lock->unlock();
}
void IntervalBasedTriggerHost::tick() {
@ -174,7 +174,7 @@ void IntervalBasedTriggerHost::tick() {
const auto delta_t = static_cast<uint32_t>((current_time - now) / 1000000); // miliseconds precision
now = current_time;
auto vt_triggers = static_cast<vector_type<IntervalBasedTrigger> *>(this->triggers);
adding_trigger->lock();
trigger_queue_lock->lock();
for(auto& t : *vt_triggers) {
if(t.tick(delta_t)) {
payload_t payload;
@ -184,7 +184,7 @@ void IntervalBasedTriggerHost::tick() {
tp->enqueue_task(payload);
}
}
adding_trigger->unlock();
trigger_queue_lock->unlock();
}
void IntervalBasedTrigger::reset() {

@ -49,7 +49,7 @@ protected:
std::thread* handle;
ThreadPool *tp;
Context* cxt;
std::mutex* adding_trigger;
std::mutex* trigger_queue_lock;
virtual void tick() = 0;
public:
@ -71,6 +71,7 @@ class IntervalBasedTriggerHost : public TriggerHost {
public:
explicit IntervalBasedTriggerHost(ThreadPool *tp);
void add_trigger(StoredProcedure* stored_procedure, uint32_t interval);
void remove_trigger(uint32_t tid);
private:
unsigned long long now;
void tick() override;

Loading…
Cancel
Save