pairinteraction
A Rydberg Interaction Calculator
ParquetManager.cpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2025 Pairinteraction Developers
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
5
8
9#include <cpptrace/cpptrace.hpp>
10#include <ctime>
11#include <duckdb.hpp>
12#include <filesystem>
13#include <fmt/core.h>
14#include <fstream>
15#include <future>
16#include <iomanip>
17#include <miniz.h>
18#include <nlohmann/json.hpp>
19#include <regex>
20#include <set>
21#include <spdlog/spdlog.h>
22#include <sstream>
23#include <stdexcept>
24#include <string>
25
26namespace fs = std::filesystem;
28
29std::string format_time(std::time_t time_val) {
30 std::tm *ptm = std::localtime(&time_val);
31 std::ostringstream oss;
32 oss << std::put_time(ptm, "%Y-%m-%d %H:%M:%S");
33 return oss.str();
34}
35
36json load_json(const fs::path &file) {
37 std::ifstream in(file);
38 in.exceptions(std::ifstream::failbit | std::ifstream::badbit);
39 json doc;
40 in >> doc;
41 return doc;
42}
43
44void save_json(const fs::path &file, const json &doc) {
45 std::ofstream out(file);
46 if (!out) {
47 throw std::runtime_error(fmt::format("Failed to open {} for writing", file.string()));
48 }
49 out << doc;
50 out.close();
51}
52
53namespace pairinteraction {
55 std::vector<std::string> repo_paths, duckdb::Connection &con,
56 bool use_cache)
57 : directory_(std::move(directory)), downloader(downloader), repo_paths_(std::move(repo_paths)),
58 con(con), use_cache_(use_cache) {
59 // Ensure the local directory exists
60 if (!std::filesystem::exists(directory_ / "tables")) {
61 fs::create_directories(directory_ / "tables");
62 }
63
64 // If repo paths are provided, check the GitHub rate limit
65 if (!repo_paths_.empty()) {
66 auto rate_limit = downloader.get_rate_limit();
67 if (rate_limit.remaining <= 0) {
68 react_on_rate_limit_reached(rate_limit.reset_time);
69 } else {
70 SPDLOG_INFO("Remaining GitHub API requests: {}. Rate limit resets at {}.",
71 rate_limit.remaining, format_time(rate_limit.reset_time));
72 }
73 }
74}
75
77 remote_asset_info.clear();
78
79 struct RepoDownload {
80 std::string endpoint;
81 fs::path cache_file;
82 json cached_doc;
83 std::future<GitHubDownloader::Result> future;
84 };
85 std::vector<RepoDownload> downloads;
86 downloads.reserve(repo_paths_.size());
87
88 // For each repo path, load its cached JSON (or an empty JSON) and issue the download
89 for (const auto &endpoint : repo_paths_) {
90 // Generate a unique cache filename per endpoint
91 std::string cache_filename =
92 "homepage_cache_" + std::to_string(std::hash<std::string>{}(endpoint)) + ".json";
93 auto cache_file = directory_ / cache_filename;
94
95 // Load cached JSON from file if it exists
96 json cached_doc;
97 if (std::filesystem::exists(cache_file)) {
98 try {
99 cached_doc = load_json(cache_file);
100 } catch (const std::exception &e) {
101 SPDLOG_WARN("Error reading {}: {}. Discarding homepage cache.", cache_file.string(),
102 e.what());
103 cached_doc = json{};
104 }
105 }
106
107 // Extract the last-modified header from the cached JSON if available
108 std::string last_modified;
109 if (!cached_doc.is_null() && cached_doc.contains("last-modified")) {
110 last_modified = cached_doc["last-modified"].get<std::string>();
111 }
112
113 // Issue the asynchronous download using the cached last-modified value.
114 downloads.push_back(
115 {endpoint, cache_file, cached_doc, downloader.download(endpoint, last_modified)});
116 }
117
118 // Process downloads for each repo path
119 for (auto &dl : downloads) {
120 auto result = dl.future.get();
121 json doc;
122 if (result.status_code == 200) {
123 doc = json::parse(result.body, nullptr, /*allow_exceptions=*/false);
124 doc["last-modified"] = result.last_modified;
125 save_json(dl.cache_file, doc);
126 SPDLOG_INFO("Using downloaded overview of available tables from {}.", dl.endpoint);
127 } else if (result.status_code == 304) {
128 if (dl.cached_doc.is_null() || dl.cached_doc.empty()) {
129 throw std::runtime_error(
130 fmt::format("Received 304 Not Modified but cached response {} does not exist.",
131 dl.cache_file.string()));
132 }
133 doc = dl.cached_doc;
134 SPDLOG_INFO("Using cached overview of available tables from {}.", dl.endpoint);
135 } else if (result.status_code == 403 || result.status_code == 429) {
136 react_on_rate_limit_reached(result.rate_limit.reset_time);
137 return;
138 } else {
139 throw std::runtime_error(fmt::format(
140 "Failed to download overview of available tables from {}: status code {}.",
141 dl.endpoint, result.status_code));
142 }
143
144 // Validate the JSON response
145 if (doc.is_discarded() || !doc.contains("assets")) {
146 throw std::runtime_error(fmt::format(
147 "Failed to parse remote JSON or missing 'assets' key from {}.", dl.endpoint));
148 }
149
150 // Update remote_asset_info based on the asset entries
151 for (auto &asset : doc["assets"]) {
152 std::string name = asset["name"].get<std::string>();
153 std::smatch match;
154
155 if (std::regex_match(name, match, remote_regex) && match.size() == 4) {
156 std::string key = match[1].str();
157 int version_major = std::stoi(match[2].str());
158 int version_minor = std::stoi(match[3].str());
159
160 if (version_major != COMPATIBLE_DATABASE_VERSION_MAJOR) {
161 continue;
162 }
163
164 auto it = remote_asset_info.find(key);
165 if (it == remote_asset_info.end() || version_minor > it->second.version_minor) {
166 std::string remote_url = asset["url"].get<std::string>();
167 const std::string host = downloader.get_host();
168 remote_asset_info[key] = {version_minor, remote_url.erase(0, host.size())};
169 }
170 }
171 }
172 }
173
174 // Ensure that scan_remote was successful
175 if (!downloads.empty() && remote_asset_info.empty()) {
176 throw std::runtime_error(
177 "No compatible database tables were found in the remote repositories. Consider "
178 "upgrading pairinteraction to a newer version.");
179 }
180}
181
183 local_asset_info.clear();
184
185 // Iterate over files in the directory to update local_asset_info
186 for (const auto &entry : fs::directory_iterator(directory_ / "tables")) {
187 std::string name = entry.path().filename().string();
188 std::smatch match;
189
190 if (entry.is_directory() && std::regex_match(name, match, local_regex) &&
191 match.size() == 4) {
192 std::string key = match[1].str();
193 int version_major = std::stoi(match[2].str());
194 int version_minor = std::stoi(match[3].str());
195
196 if (version_major != COMPATIBLE_DATABASE_VERSION_MAJOR) {
197 continue;
198 }
199
200 auto it = local_asset_info.find(key);
201 if (it == local_asset_info.end() || version_minor > it->second.version_minor) {
202 local_asset_info[key].version_minor = version_minor;
203 for (const auto &subentry : fs::directory_iterator(entry)) {
204 if (subentry.is_regular_file() && subentry.path().extension() == ".parquet") {
205 local_asset_info[key].paths[subentry.path().stem().string()] = {
206 subentry.path().string(), false};
207 }
208 }
209 }
210 }
211 }
212}
213
214void ParquetManager::react_on_rate_limit_reached(std::time_t reset_time) {
215 repo_paths_.clear();
216 remote_asset_info.clear();
217 SPDLOG_WARN("Rate limit reached, resets at {}. The download of database tables is disabled.",
218 format_time(reset_time));
219}
220
221void ParquetManager::update_local_asset(const std::string &key) {
222 // Get remote version if available
223 int remote_version = -1;
224 auto remote_it = remote_asset_info.find(key);
225 if (remote_it != remote_asset_info.end()) {
226 remote_version = remote_it->second.version_minor;
227 }
228
229 // Get local version if available and check if it is up-to-date
230 {
231 int local_version = -1;
232 std::shared_lock<std::shared_mutex> lock(mtx_local);
233 auto local_it = local_asset_info.find(key);
234 if (local_it != local_asset_info.end()) {
235 local_version = local_it->second.version_minor;
236 }
237 if (local_version >= remote_version) {
238 return;
239 }
240 }
241
242 // If it is not up-to-date, acquire a unique lock for updating the table
243 std::unique_lock<std::shared_mutex> lock(mtx_local);
244
245 // Re-check if the table is up to date because another thread might have updated it
246 int local_version = -1;
247 auto local_it = local_asset_info.find(key);
248 if (local_it != local_asset_info.end()) {
249 local_version = local_it->second.version_minor;
250 }
251 if (local_version >= remote_version) {
252 return;
253 }
254
255 // Download the remote file
256 std::string endpoint = remote_it->second.endpoint;
257 SPDLOG_INFO("Downloading {}_v{}.{} from {}", key, COMPATIBLE_DATABASE_VERSION_MAJOR,
258 remote_version, endpoint);
259
260 auto result = downloader.download(endpoint, "", true).get();
261 if (result.status_code == 403 || result.status_code == 429) {
262 react_on_rate_limit_reached(result.rate_limit.reset_time);
263 return;
264 }
265 if (result.status_code != 200) {
266 throw std::runtime_error(fmt::format("Failed to download table {}: status code {}.",
267 endpoint, result.status_code));
268 }
269
270 // Unzip the downloaded file
271 mz_zip_archive zip_archive{};
272 if (mz_zip_reader_init_mem(&zip_archive, result.body.data(), result.body.size(), 0) == 0) {
273 throw std::runtime_error("Failed to initialize zip archive.");
274 }
275
276 for (mz_uint i = 0; i < mz_zip_reader_get_num_files(&zip_archive); i++) {
277 mz_zip_archive_file_stat file_stat;
278 if (mz_zip_reader_file_stat(&zip_archive, i, &file_stat) == 0) {
279 throw std::runtime_error("Failed to get file stat from zip archive.");
280 }
281
282 // Skip directories
283 const char *filename = static_cast<const char *>(file_stat.m_filename);
284 size_t len = std::strlen(filename);
285 if (len > 0 && filename[len - 1] == '/') {
286 continue;
287 }
288
289 // Ensure that the filename matches the expectations
290 std::string dir = fs::path(filename).parent_path().string();
291 std::string stem = fs::path(filename).stem().string();
292 std::string suffix = fs::path(filename).extension().string();
293 std::smatch match;
294 if (!std::regex_match(dir, match, local_regex) || match.size() != 4 ||
295 suffix != ".parquet") {
296 throw std::runtime_error(
297 fmt::format("Unexpected filename {} in zip archive.", filename));
298 }
299
300 // Construct the path to store the table
301 auto path = directory_ / "tables" / dir / (stem + suffix);
302 SPDLOG_INFO("Storing table to {}", path.string());
303
304 // Extract the file to memory
305 std::vector<char> buffer(file_stat.m_uncomp_size);
306 if (mz_zip_reader_extract_to_mem(&zip_archive, i, buffer.data(), buffer.size(), 0) == 0) {
307 throw std::runtime_error(fmt::format("Failed to extract {}.", filename));
308 }
309
310 // Ensure the parent directory exists
311 fs::create_directories(path.parent_path());
312
313 // Save the extracted file
314 std::ofstream out(path.string(), std::ios::binary);
315 if (!out) {
316 throw std::runtime_error(fmt::format("Failed to open {} for writing", path.string()));
317 }
318 out.write(buffer.data(), static_cast<std::streamsize>(buffer.size()));
319 out.close();
320
321 // Update the local asset/table info
322 local_asset_info[key].version_minor = remote_version;
323 local_asset_info[key].paths[path.stem().string()] = {path.string(), false};
324 }
325
326 mz_zip_reader_end(&zip_archive);
327}
328
329void ParquetManager::cache_table(std::unordered_map<std::string, PathInfo>::iterator table_it) {
330 // Check if the table is already cached
331 {
332 std::shared_lock<std::shared_mutex> lock(mtx_local);
333 if (table_it->second.cached) {
334 return;
335 }
336 }
337
338 // Acquire a unique lock for caching the table
339 std::unique_lock<std::shared_mutex> lock(mtx_local);
340
341 // Re-check if the table is already cached because another thread might have cached it
342 if (table_it->second.cached) {
343 return;
344 }
345
346 // Cache the table in memory
347 std::string table_name;
348 {
349 auto result = con.Query(R"(SELECT UUID()::varchar)");
350 if (result->HasError()) {
351 throw cpptrace::runtime_error("Error selecting a unique table name: " +
352 result->GetError());
353 }
354 table_name =
355 duckdb::FlatVector::GetData<duckdb::string_t>(result->Fetch()->data[0])[0].GetString();
356 }
357
358 {
359 auto result = con.Query(fmt::format(R"(CREATE TEMP TABLE '{}' AS SELECT * FROM '{}')",
360 table_name, table_it->second.path));
361 if (result->HasError()) {
362 throw cpptrace::runtime_error("Error creating table: " + result->GetError());
363 }
364 }
365
366 table_it->second.path = table_name;
367 table_it->second.cached = true;
368}
369
370std::string ParquetManager::get_path(const std::string &key, const std::string &table) {
371 // Update the local table if a newer version is available remotely
372 this->update_local_asset(key);
373
374 // Ensure availability of the local table file
375 auto asset_it = local_asset_info.find(key);
376 if (asset_it == local_asset_info.end()) {
377 throw std::runtime_error("Table " + key + "_" + table + " not found.");
378 }
379 auto table_it = asset_it->second.paths.find(table);
380 if (table_it == asset_it->second.paths.end()) {
381 throw std::runtime_error("Table " + key + "_" + table + " not found.");
382 }
383
384 // Cache the local table in memory if requested
385 if (use_cache_) {
386 this->cache_table(table_it);
387 }
388
389 // Return the path to the local table file
390 return table_it->second.path;
391}
392
394 // Helper lambda returns the version string if available
395 auto get_version = [](const auto &map, const std::string &table) -> int {
396 if (auto it = map.find(table); it != map.end()) {
397 return it->second.version_minor;
398 }
399 return -1;
400 };
401
402 // Gather all unique table names
403 std::set<std::string> tables;
404 for (const auto &entry : local_asset_info) {
405 tables.insert(entry.first);
406 }
407 for (const auto &entry : remote_asset_info) {
408 tables.insert(entry.first);
409 }
410
411 // Output versions info
412 std::ostringstream oss;
413
414 oss << " ";
415 oss << std::left << std::setw(17) << "Asset";
416 oss << std::left << std::setw(6 + 4) << "Local";
417 oss << std::left << std::setw(7) << "Remote\n";
418 oss << std::string(35, '-') << "\n";
419
420 for (const auto &table : tables) {
421 int local_version = get_version(local_asset_info, table);
422 int remote_version = get_version(remote_asset_info, table);
423
424 std::string comparator = (local_version < remote_version)
425 ? "<"
426 : ((local_version > remote_version) ? ">" : "==");
427 std::string local_version_str = local_version == -1
428 ? "N/A"
429 : "v" + std::to_string(COMPATIBLE_DATABASE_VERSION_MAJOR) + "." +
430 std::to_string(local_version);
431 std::string remote_version_str = remote_version == -1
432 ? "N/A"
433 : "v" + std::to_string(COMPATIBLE_DATABASE_VERSION_MAJOR) + "." +
434 std::to_string(remote_version);
435
436 oss << " ";
437 oss << std::left << std::setw(17) << table;
438 oss << std::left << std::setw(6) << local_version_str;
439 oss << std::left << std::setw(4) << comparator;
440 oss << std::left << std::setw(7) << remote_version_str << "\n";
441 }
442 return oss.str();
443}
444
445} // namespace pairinteraction
json load_json(const fs::path &file)
std::string format_time(std::time_t time_val)
nlohmann::json json
void save_json(const fs::path &file, const json &doc)
virtual std::future< Result > download(const std::string &remote_url, const std::string &if_modified_since="", bool use_octet_stream=false) const
ParquetManager(std::filesystem::path directory, const GitHubDownloader &downloader, std::vector< std::string > repo_paths, duckdb::Connection &con, bool use_cache)
std::string get_versions_info() const
std::string get_path(const std::string &key, const std::string &table)
constexpr int COMPATIBLE_DATABASE_VERSION_MAJOR
Definition: version.hpp:12