9#include <cpptrace/cpptrace.hpp>
18#include <nlohmann/json.hpp>
21#include <spdlog/spdlog.h>
26namespace fs = std::filesystem;
30 std::tm *ptm = std::localtime(&time_val);
31 std::ostringstream oss;
32 oss << std::put_time(ptm,
"%Y-%m-%d %H:%M:%S");
37 std::ifstream in(file);
38 in.exceptions(std::ifstream::failbit | std::ifstream::badbit);
45 std::ofstream out(file);
47 throw std::runtime_error(fmt::format(
"Failed to open {} for writing", file.string()));
55 std::vector<std::string> repo_paths, duckdb::Connection &con,
57 : directory_(std::move(directory)), downloader(downloader), repo_paths_(std::move(repo_paths)),
58 con(con), use_cache_(use_cache) {
60 if (!std::filesystem::exists(directory_ /
"tables")) {
61 fs::create_directories(directory_ /
"tables");
65 if (!repo_paths_.empty()) {
67 if (rate_limit.remaining <= 0) {
68 react_on_rate_limit_reached(rate_limit.reset_time);
70 SPDLOG_INFO(
"Remaining GitHub API requests: {}. Rate limit resets at {}.",
71 rate_limit.remaining,
format_time(rate_limit.reset_time));
77 remote_asset_info.clear();
83 std::future<GitHubDownloader::Result> future;
85 std::vector<RepoDownload> downloads;
86 downloads.reserve(repo_paths_.size());
89 for (
const auto &endpoint : repo_paths_) {
91 std::string cache_filename =
92 "homepage_cache_" + std::to_string(std::hash<std::string>{}(endpoint)) +
".json";
93 auto cache_file = directory_ / cache_filename;
97 if (std::filesystem::exists(cache_file)) {
100 }
catch (
const std::exception &e) {
101 SPDLOG_WARN(
"Error reading {}: {}. Discarding homepage cache.", cache_file.string(),
108 std::string last_modified;
109 if (!cached_doc.is_null() && cached_doc.contains(
"last-modified")) {
110 last_modified = cached_doc[
"last-modified"].get<std::string>();
115 {endpoint, cache_file, cached_doc, downloader.
download(endpoint, last_modified)});
119 for (
auto &dl : downloads) {
120 auto result = dl.future.get();
122 if (result.status_code == 200) {
123 doc = json::parse(result.body,
nullptr,
false);
124 doc[
"last-modified"] = result.last_modified;
126 SPDLOG_INFO(
"Using downloaded overview of available tables from {}.", dl.endpoint);
127 }
else if (result.status_code == 304) {
128 if (dl.cached_doc.is_null() || dl.cached_doc.empty()) {
129 throw std::runtime_error(
130 fmt::format(
"Received 304 Not Modified but cached response {} does not exist.",
131 dl.cache_file.string()));
134 SPDLOG_INFO(
"Using cached overview of available tables from {}.", dl.endpoint);
135 }
else if (result.status_code == 403 || result.status_code == 429) {
136 react_on_rate_limit_reached(result.rate_limit.reset_time);
139 throw std::runtime_error(fmt::format(
140 "Failed to download overview of available tables from {}: status code {}.",
141 dl.endpoint, result.status_code));
145 if (doc.is_discarded() || !doc.contains(
"assets")) {
146 throw std::runtime_error(fmt::format(
147 "Failed to parse remote JSON or missing 'assets' key from {}.", dl.endpoint));
151 for (
auto &asset : doc[
"assets"]) {
152 std::string name = asset[
"name"].get<std::string>();
155 if (std::regex_match(name, match, remote_regex) && match.size() == 4) {
156 std::string key = match[1].str();
157 int version_major = std::stoi(match[2].str());
158 int version_minor = std::stoi(match[3].str());
164 auto it = remote_asset_info.find(key);
165 if (it == remote_asset_info.end() || version_minor > it->second.version_minor) {
166 std::string remote_url = asset[
"url"].get<std::string>();
167 const std::string host = downloader.
get_host();
168 remote_asset_info[key] = {version_minor, remote_url.erase(0, host.size())};
175 if (!downloads.empty() && remote_asset_info.empty()) {
176 throw std::runtime_error(
177 "No compatible database tables were found in the remote repositories. Consider "
178 "upgrading pairinteraction to a newer version.");
183 local_asset_info.clear();
186 for (
const auto &entry : fs::directory_iterator(directory_ /
"tables")) {
187 std::string name = entry.path().filename().string();
190 if (entry.is_directory() && std::regex_match(name, match, local_regex) &&
192 std::string key = match[1].str();
193 int version_major = std::stoi(match[2].str());
194 int version_minor = std::stoi(match[3].str());
200 auto it = local_asset_info.find(key);
201 if (it == local_asset_info.end() || version_minor > it->second.version_minor) {
202 local_asset_info[key].version_minor = version_minor;
203 for (
const auto &subentry : fs::directory_iterator(entry)) {
204 if (subentry.is_regular_file() && subentry.path().extension() ==
".parquet") {
205 local_asset_info[key].paths[subentry.path().stem().string()] = {
206 subentry.path().string(),
false};
214void ParquetManager::react_on_rate_limit_reached(std::time_t reset_time) {
216 remote_asset_info.clear();
217 SPDLOG_WARN(
"Rate limit reached, resets at {}. The download of database tables is disabled.",
221void ParquetManager::update_local_asset(
const std::string &key) {
223 int remote_version = -1;
224 auto remote_it = remote_asset_info.find(key);
225 if (remote_it != remote_asset_info.end()) {
226 remote_version = remote_it->second.version_minor;
231 int local_version = -1;
232 std::shared_lock<std::shared_mutex> lock(mtx_local);
233 auto local_it = local_asset_info.find(key);
234 if (local_it != local_asset_info.end()) {
235 local_version = local_it->second.version_minor;
237 if (local_version >= remote_version) {
243 std::unique_lock<std::shared_mutex> lock(mtx_local);
246 int local_version = -1;
247 auto local_it = local_asset_info.find(key);
248 if (local_it != local_asset_info.end()) {
249 local_version = local_it->second.version_minor;
251 if (local_version >= remote_version) {
256 std::string endpoint = remote_it->second.endpoint;
258 remote_version, endpoint);
260 auto result = downloader.
download(endpoint,
"",
true).get();
261 if (result.status_code == 403 || result.status_code == 429) {
262 react_on_rate_limit_reached(result.rate_limit.reset_time);
265 if (result.status_code != 200) {
266 throw std::runtime_error(fmt::format(
"Failed to download table {}: status code {}.",
267 endpoint, result.status_code));
271 mz_zip_archive zip_archive{};
272 if (mz_zip_reader_init_mem(&zip_archive, result.body.data(), result.body.size(), 0) == 0) {
273 throw std::runtime_error(
"Failed to initialize zip archive.");
276 for (mz_uint i = 0; i < mz_zip_reader_get_num_files(&zip_archive); i++) {
277 mz_zip_archive_file_stat file_stat;
278 if (mz_zip_reader_file_stat(&zip_archive, i, &file_stat) == 0) {
279 throw std::runtime_error(
"Failed to get file stat from zip archive.");
283 const char *filename =
static_cast<const char *
>(file_stat.m_filename);
284 size_t len = std::strlen(filename);
285 if (len > 0 && filename[len - 1] ==
'/') {
290 std::string dir =
fs::path(filename).parent_path().string();
291 std::string stem =
fs::path(filename).stem().string();
292 std::string suffix =
fs::path(filename).extension().string();
294 if (!std::regex_match(dir, match, local_regex) || match.size() != 4 ||
295 suffix !=
".parquet") {
296 throw std::runtime_error(
297 fmt::format(
"Unexpected filename {} in zip archive.", filename));
301 auto path = directory_ /
"tables" / dir / (stem + suffix);
302 SPDLOG_INFO(
"Storing table to {}", path.string());
305 std::vector<char> buffer(file_stat.m_uncomp_size);
306 if (mz_zip_reader_extract_to_mem(&zip_archive, i, buffer.data(), buffer.size(), 0) == 0) {
307 throw std::runtime_error(fmt::format(
"Failed to extract {}.", filename));
311 fs::create_directories(path.parent_path());
314 std::ofstream out(path.string(), std::ios::binary);
316 throw std::runtime_error(fmt::format(
"Failed to open {} for writing", path.string()));
318 out.write(buffer.data(),
static_cast<std::streamsize
>(buffer.size()));
322 local_asset_info[key].version_minor = remote_version;
323 local_asset_info[key].paths[path.stem().string()] = {path.string(),
false};
326 mz_zip_reader_end(&zip_archive);
329void ParquetManager::cache_table(std::unordered_map<std::string, PathInfo>::iterator table_it) {
332 std::shared_lock<std::shared_mutex> lock(mtx_local);
333 if (table_it->second.cached) {
339 std::unique_lock<std::shared_mutex> lock(mtx_local);
342 if (table_it->second.cached) {
347 std::string table_name;
349 auto result = con.Query(R
"(SELECT UUID()::varchar)");
350 if (result->HasError()) {
351 throw cpptrace::runtime_error(
"Error selecting a unique table name: " +
355 duckdb::FlatVector::GetData<duckdb::string_t>(result->Fetch()->data[0])[0].GetString();
359 auto result = con.Query(fmt::format(R
"(CREATE TEMP TABLE '{}' AS SELECT * FROM '{}')",
360 table_name, table_it->second.path));
361 if (result->HasError()) {
362 throw cpptrace::runtime_error(
"Error creating table: " + result->GetError());
366 table_it->second.path = table_name;
367 table_it->second.cached =
true;
372 this->update_local_asset(key);
375 auto asset_it = local_asset_info.find(key);
376 if (asset_it == local_asset_info.end()) {
377 throw std::runtime_error(
"Table " + key +
"_" + table +
" not found.");
379 auto table_it = asset_it->second.paths.find(table);
380 if (table_it == asset_it->second.paths.end()) {
381 throw std::runtime_error(
"Table " + key +
"_" + table +
" not found.");
386 this->cache_table(table_it);
390 return table_it->second.path;
395 auto get_version = [](
const auto &map,
const std::string &table) ->
int {
396 if (
auto it = map.find(table); it != map.end()) {
397 return it->second.version_minor;
403 std::set<std::string> tables;
404 for (
const auto &entry : local_asset_info) {
405 tables.insert(entry.first);
407 for (
const auto &entry : remote_asset_info) {
408 tables.insert(entry.first);
412 std::ostringstream oss;
415 oss << std::left << std::setw(17) <<
"Asset";
416 oss << std::left << std::setw(6 + 4) <<
"Local";
417 oss << std::left << std::setw(7) <<
"Remote\n";
418 oss << std::string(35,
'-') <<
"\n";
420 for (
const auto &table : tables) {
421 int local_version = get_version(local_asset_info, table);
422 int remote_version = get_version(remote_asset_info, table);
424 std::string comparator = (local_version < remote_version)
426 : ((local_version > remote_version) ?
">" :
"==");
427 std::string local_version_str = local_version == -1
430 std::to_string(local_version);
431 std::string remote_version_str = remote_version == -1
434 std::to_string(remote_version);
437 oss << std::left << std::setw(17) << table;
438 oss << std::left << std::setw(6) << local_version_str;
439 oss << std::left << std::setw(4) << comparator;
440 oss << std::left << std::setw(7) << remote_version_str <<
"\n";
json load_json(const fs::path &file)
std::string format_time(std::time_t time_val)
void save_json(const fs::path &file, const json &doc)
virtual std::future< Result > download(const std::string &remote_url, const std::string &if_modified_since="", bool use_octet_stream=false) const
RateLimit get_rate_limit() const
std::string get_host() const
ParquetManager(std::filesystem::path directory, const GitHubDownloader &downloader, std::vector< std::string > repo_paths, duckdb::Connection &con, bool use_cache)
std::string get_versions_info() const
std::string get_path(const std::string &key, const std::string &table)
constexpr int COMPATIBLE_DATABASE_VERSION_MAJOR