initial support for duckdb

master
bill 2 years ago
parent d98b4817b3
commit 200dc71aad

@ -345,10 +345,12 @@
<ItemGroup>
<ClInclude Include="..\csv.h" />
<ClInclude Include="..\server\aggregations.h" />
<ClInclude Include="..\server\duckdb_conn.h" />
<ClInclude Include="..\server\gc.h" />
<ClInclude Include="..\server\hasher.h" />
<ClInclude Include="..\server\io.h" />
<ClInclude Include="..\server\libaquery.h" />
<ClInclude Include="..\server\monetdb_conn.h" />
<ClInclude Include="..\server\priority_vector.hpp" />
<ClInclude Include="..\server\table.h" />
<ClInclude Include="..\server\types.h" />
@ -361,6 +363,7 @@
<None Include="..\server\cpp.hint" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\server\duckdb_conn.cpp" />
<ClCompile Include="..\server\monetdb_ext.c" />
<ClCompile Include="..\server\server.cpp" />
<ClCompile Include="..\server\threading.cpp" />

@ -0,0 +1,43 @@
#ifndef __DATASOURCE_CONN_H__
#define __DATASOURCE_CONN_H__
struct Context;
struct AQQueryResult {
void* res;
unsigned ref;
};
enum DataSourceType {
Invalid,
MonetDB,
MariaDB,
DuckDB,
SQLite
};
struct DataSource {
void* server = nullptr;
Context* cxt = nullptr;
bool status = false;
char* query = nullptr;
DataSourceType type = Invalid;
void* res = nullptr;
void* ret_col = nullptr;
long long cnt = 0;
const char* last_error = nullptr;
void* handle;
DataSource() = default;
explicit DataSource(Context* cxt = nullptr) = delete;
virtual void connect(Context* cxt) = 0;
virtual void exec(const char* q) = 0;
virtual void* getCol(int col_idx, int type) = 0;
// virtual long long getFirstElement() = 0;
virtual void close() = 0;
virtual bool haserror() = 0;
// virtual void print_results(const char* sep = " ", const char* end = "\n");
virtual ~DataSource() = 0;
};
#endif //__DATASOURCE_CONN_H__

@ -0,0 +1,89 @@
#include "duckdb_conn.h"
#include "../deps/duckdb.hpp"
#include "libaquery.h"
#include "types.h"
#include <cstdio>
#include <string_view>
using namespace std;
void DuckdbServer::connect(Context* cxt) {
duckdb_database* db_handle =
static_cast<duckdb_database*>(malloc(sizeof(duckdb_database)));
this->handle = db_handle;
bool status = duckdb_open(nullptr, db_handle);
duckdb_connection* conn_handle;
status = status || duckdb_connect(*db_handle, conn_handle);
this->server = conn_handle;
if (status != 0) {
puts("DuckdbServer: Error! Creating/Connecting to INMemory DB.");
}
}
DuckdbServer::DuckdbServer(Context* cxt) {
this->cxt = cxt;
connect(cxt);
}
void DuckdbServer::exec(const char* q) {
//TODO: Add res to GC queue with ref count.
auto res = static_cast<duckdb_result*>(malloc(sizeof(duckdb_result)));
auto status = duckdb_query(*static_cast<duckdb_connection*>(this->server), q, res);
if (status) {
last_error = duckdb_result_error(res);
this->res = nullptr;
this->cnt = 0;
}
else {
this->res = res;
this->cnt = duckdb_row_count(res);
}
}
void* DuckdbServer::getCol(int col_idx, int ty) {
auto res = static_cast<duckdb_result*>(this->res);
if (ty == types::Type_t::ASTR) {
std::string_view* ret =
static_cast<string_view*>(malloc(sizeof(std::string_view) * cnt));
for(uint32_t i = 0; i < cnt; ++i)
ret[i] = {duckdb_value_varchar(res, col_idx, i)};
return ret;
}
else
{
auto chk = duckdb_result_chunk_count(*res);
const auto v_size = duckdb_vector_size();
auto sz = types::AType_sizes[ty];
char* ret = static_cast<char*>(malloc(sz * cnt));
uint32_t j = 0;
for (uint32_t i = 0; i < chk; ++i) {
auto data = duckdb_vector_get_data(
duckdb_data_chunk_get_vector(
duckdb_result_get_chunk(*res, i),
col_idx)
);
const auto curr_ptr = i * v_size;
const auto rem_size = int(cnt) - curr_ptr;
memcpy(ret + i * v_size, data,
(rem_size < v_size ? rem_size : v_size) * sz);
}
return ret;
}
}
bool DuckdbServer::haserror() {
if (last_error) {
puts(last_error);
last_error = nullptr;
return true;
}
return false;
}
void DuckdbServer::close() {
duckdb_disconnect(static_cast<duckdb_connection*>(server));
duckdb_close(static_cast<duckdb_database*>(handle));
}
DuckdbServer::~DuckdbServer() {
this->close();
}

@ -0,0 +1,16 @@
#ifndef __DUCKDB_CONN_H__
#define __DUCKDB_CONN_H__
#include "DataSource_conn.h"
struct DuckdbServer : DataSource {
explicit DuckdbServer(Context* cxt = nullptr);
void connect(Context* cxt);
void exec(const char* q);
void* getCol(int col_idx, int type);
long long getFirstElement();
void close();
bool haserror();
void print_results(const char* sep = " ", const char* end = "\n");
~DuckdbServer();
};
#endif //__DUCKDB_CONN_H__

@ -81,6 +81,11 @@ struct Config{
int buffer_sizes[];
};
struct AQQueryResult {
void* res;
uint32_t ref;
};
struct Session{
struct Statistic{
size_t total_active;

@ -23,7 +23,6 @@ struct Server{
void close();
bool haserror();
static bool havehge();
void test(const char*);
void print_results(const char* sep = " ", const char* end = "\n");
friend void print_monetdb_results(void* _srv, const char* sep, const char* end, int limit);
~Server();

Loading…
Cancel
Save