From b60bfc478d2229875a79a16a4e350e8ef3209d79 Mon Sep 17 00:00:00 2001 From: Bill Date: Fri, 10 Feb 2023 16:27:59 +0800 Subject: [PATCH] fix docker --- .gitignore | 2 ++ Dockerfile | 2 +- rfdata_preproc.py | 14 +++++++------- server/monetdb_conn.h | 5 +++++ server/monetdb_ext.c | 17 +++++++---------- server/threading.cpp | 10 +++++----- server/threading.h | 3 ++- 7 files changed, 29 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index 72774f3..022a020 100644 --- a/.gitignore +++ b/.gitignore @@ -87,3 +87,5 @@ udf*.hpp *.ipynb saved_procedures/** procedures/** +.mypy_cache +__pycache__ diff --git a/Dockerfile b/Dockerfile index 953d89f..b737ce9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ RUN export OS_VER=`cat /etc/os-release | grep VERSION_CODENAME` &&\ RUN wget --output-document=/etc/apt/trusted.gpg.d/monetdb.gpg https://dev.monetdb.org/downloads/MonetDB-GPG-KEY.gpg -RUN apt update && apt install -y python3 python3-pip clang-14 libmonetdbe-dev git +RUN apt update && apt install -y python3 python3-pip clang-14 libmonetdbe-dev libmonetdb-client-dev monetdb5-sql-dev git RUN git clone https://github.com/sunyinqi0508/AQuery2 diff --git a/rfdata_preproc.py b/rfdata_preproc.py index fdd8cb0..e6720a2 100644 --- a/rfdata_preproc.py +++ b/rfdata_preproc.py @@ -9,11 +9,11 @@ use_threadpool = False # True # environments input_prefix = f'data{sep}{dataset}_orig' output_prefix = f'data{sep}{dataset}' -sep_field = ',' -sep_subfield = ';' +sep_field = b',' +sep_subfield = b';' lst_files = os.listdir(input_prefix) -lst_files.sort() +# lst_files.sort() try: os.mkdir(output_prefix) @@ -24,15 +24,15 @@ def process(f : str): filename = input_prefix + sep + f ofilename = output_prefix + sep + f[:-3] + 'csv' with open(filename, 'rb') as ifile: - icontents = ifile.read().decode('utf-8') + icontents = ifile.read() with open(ofilename, 'wb') as ofile: for l in icontents.splitlines(): - fields = l.strip().split(' ') + fields = l.strip().split(b' ') subfields = fields[:-1] ol = ( # fields[0] + sep_field + sep_subfield.join(subfields) + - sep_field + fields[-1] + '\n') - ofile.write(ol.encode('utf-8')) + sep_field + fields[-1] + b'\n') + ofile.write(ol) if not use_threadpool: for f in lst_files: diff --git a/server/monetdb_conn.h b/server/monetdb_conn.h index 389ca63..891c9a6 100644 --- a/server/monetdb_conn.h +++ b/server/monetdb_conn.h @@ -34,4 +34,9 @@ struct monetdbe_table_data{ void* cols; }; +size_t +monetdbe_get_size(void* dbhdl, const char *table_name); + +void* +monetdbe_get_col(void* dbhdl, const char *table_name, uint32_t col_id); #endif diff --git a/server/monetdb_ext.c b/server/monetdb_ext.c index 9a3b7d8..8111108 100644 --- a/server/monetdb_ext.c +++ b/server/monetdb_ext.c @@ -1,3 +1,5 @@ +// Non-standard Extensions for MonetDBe, may break concurrency control! + #include "monetdbe.h" #include #include "mal_client.h" @@ -61,33 +63,28 @@ typedef struct { str mid; } monetdbe_database_internal; - size_t -monetdbe_get_size(monetdbe_database dbhdl, const char* schema_name, const char *table_name) +monetdbe_get_size(monetdbe_database dbhdl, const char *table_name) { monetdbe_database_internal* hdl = (monetdbe_database_internal*)dbhdl; backend* be = ((backend *)(((monetdbe_database_internal*)dbhdl)->c->sqlcontext)); mvc *m = be->mvc; - mvc_trans(m); - sql_table *t = find_table_or_view_on_scope(m, NULL, schema_name, table_name, "CATALOG", false); + sql_table *t = find_table_or_view_on_scope(m, NULL, "sys", table_name, "CATALOG", false); sql_column *col = ol_first_node(t->columns)->data; sqlstore* store = m->store; size_t sz = store->storage_api.count_col(m->session->tr, col, QUICK); - mvc_cancel_session(m); return sz; } void* -monetdbe_get_col(monetdbe_database dbhdl, const char* schema_name, const char *table_name, uint32_t col_id) { +monetdbe_get_col(monetdbe_database dbhdl, const char *table_name, uint32_t col_id) { monetdbe_database_internal* hdl = (monetdbe_database_internal*)dbhdl; backend* be = ((backend *)(((monetdbe_database_internal*)dbhdl)->c->sqlcontext)); mvc *m = be->mvc; - mvc_trans(m); - sql_table *t = find_table_or_view_on_scope(m, NULL, schema_name, table_name, "CATALOG", false); + sql_table *t = find_table_or_view_on_scope(m, NULL, "sys", table_name, "CATALOG", false); sql_column *col = ol_fetch(t->columns, col_id); sqlstore* store = m->store; - BAT *b = store->storage_api.bind_col(m->session->tr, col, RDONLY); + BAT *b = store->storage_api.bind_col(m->session->tr, col, QUICK); BATiter iter = bat_iterator(b); - mvc_cancel_session(m); return iter.base; } diff --git a/server/threading.cpp b/server/threading.cpp index cf61be9..092ee43 100644 --- a/server/threading.cpp +++ b/server/threading.cpp @@ -157,16 +157,16 @@ bool ThreadPool::busy(){ IntervalBasedTriggerHost::IntervalBasedTriggerHost(ThreadPool* tp){ this->tp = tp; this->triggers = new vector_type; - adding_trigger = new mutex(); + trigger_queue_lock = new mutex(); this->now = std::chrono::high_resolution_clock::now().time_since_epoch().count(); } void IntervalBasedTriggerHost::add_trigger(StoredProcedure *p, uint32_t interval) { auto tr = IntervalBasedTrigger{.interval = interval, .time_remaining = 0, .sp = p}; auto vt_triggers = static_cast *>(this->triggers); - adding_trigger->lock(); + trigger_queue_lock->lock(); vt_triggers->emplace_back(tr); - adding_trigger->unlock(); + trigger_queue_lock->unlock(); } void IntervalBasedTriggerHost::tick() { @@ -174,7 +174,7 @@ void IntervalBasedTriggerHost::tick() { const auto delta_t = static_cast((current_time - now) / 1000000); // miliseconds precision now = current_time; auto vt_triggers = static_cast *>(this->triggers); - adding_trigger->lock(); + trigger_queue_lock->lock(); for(auto& t : *vt_triggers) { if(t.tick(delta_t)) { payload_t payload; @@ -184,7 +184,7 @@ void IntervalBasedTriggerHost::tick() { tp->enqueue_task(payload); } } - adding_trigger->unlock(); + trigger_queue_lock->unlock(); } void IntervalBasedTrigger::reset() { diff --git a/server/threading.h b/server/threading.h index 94277a7..f2f3b31 100644 --- a/server/threading.h +++ b/server/threading.h @@ -49,7 +49,7 @@ protected: std::thread* handle; ThreadPool *tp; Context* cxt; - std::mutex* adding_trigger; + std::mutex* trigger_queue_lock; virtual void tick() = 0; public: @@ -71,6 +71,7 @@ class IntervalBasedTriggerHost : public TriggerHost { public: explicit IntervalBasedTriggerHost(ThreadPool *tp); void add_trigger(StoredProcedure* stored_procedure, uint32_t interval); + void remove_trigger(uint32_t tid); private: unsigned long long now; void tick() override;