diff --git a/dnf5daemon-client/CMakeLists.txt b/dnf5daemon-client/CMakeLists.txt
index bf32c1aee5..f065c511b8 100644
--- a/dnf5daemon-client/CMakeLists.txt
+++ b/dnf5daemon-client/CMakeLists.txt
@@ -15,6 +15,9 @@ include_directories(..)
pkg_check_modules(SDBUS_CPP REQUIRED sdbus-c++)
+pkg_check_modules(JSONC REQUIRED json-c)
+include_directories(${JSONC_INCLUDE_DIRS})
+
add_executable(${DNF5DAEMON_CLIENT_BIN} ${DNF5DAEMON_CLIENT_SOURCES})
target_link_libraries(
@@ -24,6 +27,7 @@ target_link_libraries(
libdnf5
libdnf5-cli
${SDBUS_CPP_LIBRARIES}
+ ${JSONC_LIBRARIES}
pthread
)
diff --git a/dnf5daemon-client/commands/repoquery/repoquery.cpp b/dnf5daemon-client/commands/repoquery/repoquery.cpp
index 8c8711d596..f63782a545 100644
--- a/dnf5daemon-client/commands/repoquery/repoquery.cpp
+++ b/dnf5daemon-client/commands/repoquery/repoquery.cpp
@@ -23,14 +23,20 @@ along with libdnf. If not, see .
#include "wrappers/dbus_package_wrapper.hpp"
#include
+#include
#include
+#include
#include
+#include
#include
#include
#include
#include
+#include
+#include
#include
+#include
namespace dnfdaemon::client {
@@ -98,6 +104,61 @@ dnfdaemon::KeyValueMap RepoqueryCommand::session_config() {
return cfg;
}
+/// Read package objects from json stream. The `json_stream` string is modified, all
+/// successfully parsed json objects are removed from it, possible partial json object
+/// is left for next round of parsing after the rest of the string arrives.
+std::vector json_to_packages(std::string & json_stream) {
+ libdnf_assert(json_stream.size() < 1000000, "can not parse JSON string bigger then maxing");
+ std::vector packages;
+
+ json_tokener * tokener;
+ tokener = json_tokener_new();
+ while (!json_stream.empty()) {
+ json_object * json_pkg =
+ json_tokener_parse_ex(tokener, json_stream.c_str(), static_cast(json_stream.size()));
+ if (json_pkg) {
+ dnfdaemon::KeyValueMap dbuspkg;
+ json_object_object_foreach(json_pkg, key, val) {
+ switch (json_object_get_type(val)) {
+ case json_type_boolean:
+ dbuspkg[key] = static_cast(json_object_get_boolean(val));
+ break;
+ case json_type_int:
+ dbuspkg[key] = static_cast(json_object_get_int64(val));
+ break;
+ default:
+ dbuspkg[key] = json_object_get_string(val);
+ }
+ }
+ packages.emplace_back(DbusPackageWrapper(dbuspkg));
+
+ auto parse_end = json_tokener_get_parse_end(tokener);
+ json_tokener_reset(tokener);
+ json_object_put(json_pkg);
+ if (parse_end < json_stream.size()) {
+ // more json objects on the line, remove parsed part and continue
+ json_stream = json_stream.substr(parse_end);
+ } else {
+ // everything has been parsed
+ json_stream.clear();
+ }
+ } else {
+ auto err = json_tokener_get_error(tokener);
+ if (err == json_tokener_continue) {
+ // only partial json string in the input line, continue on the next one
+ break;
+ } else {
+ // this is unrecoverable, throw error
+ // XXX throw
+ std::cout << "XXX error " << json_tokener_error_desc(err) << std::endl;
+ break;
+ }
+ }
+ }
+ json_tokener_free(tokener);
+ return packages;
+}
+
void RepoqueryCommand::run() {
auto & ctx = get_context();
@@ -135,29 +196,86 @@ void RepoqueryCommand::run() {
options.insert(std::pair>("package_attrs", {"full_nevra"}));
}
- dnfdaemon::KeyValueMapList packages;
- ctx.session_proxy->callMethod("list")
+ std::array pipefd{}; // File descriptors for the pipe
+ if (pipe2(pipefd.data(), O_NONBLOCK) == -1) {
+ std::cerr << "Error: failed to create a pipe." << std::endl;
+ return;
+ }
+
+ std::string fd_id{};
+ ctx.session_proxy->callMethod("list_fd")
.onInterface(dnfdaemon::INTERFACE_RPM)
.withTimeout(static_cast(-1))
.withArguments(options)
- .storeResultsTo(packages);
-
- auto num_packages = packages.size();
- for (auto & raw_package : packages) {
- --num_packages;
- DbusPackageWrapper package(raw_package);
- if (info_option->get_value()) {
- auto out = libdnf5::cli::output::PackageInfoSections();
- out.setup_cols();
- out.add_package(package);
- out.print();
- if (num_packages) {
- std::cout << std::endl;
- }
- } else {
- std::cout << package.get_full_nevra() << std::endl;
+ .withArguments(sdbus::UnixFd{pipefd[1]})
+ .withArguments(fd_id);
+ close(pipefd[1]);
+
+ int in_fd = pipefd[0];
+
+ const size_t pipe_buffer_size = fcntl(in_fd, F_GETPIPE_SZ);
+ const int timeout = 30000;
+
+ std::array pfds;
+ pfds[0].fd = in_fd;
+ pfds[0].events = POLLIN;
+
+ std::string message;
+ ssize_t total_bytes_read = 0;
+ ssize_t bytes_read = -1;
+
+ std::vector buffer(pipe_buffer_size); // Buffer for reading data
+ enum class STATUS { PENDING, FINISHED, FAILED } reading_status;
+ reading_status = STATUS::PENDING;
+ while (reading_status == STATUS::PENDING) {
+ switch (poll(pfds.data(), pfds.size(), timeout)) {
+ case -1:
+ // poll() failed
+ reading_status = STATUS::FAILED;
+ std::cerr << "Error: poll() failed." << std::endl;
+ break;
+ case 0:
+ // timeout was reached
+ reading_status = STATUS::FAILED;
+ std::cerr << "Error: timeout while waiting for data." << std::endl;
+ break;
+ default:
+ if ((pfds[0].revents & POLLIN) != 0) {
+ bytes_read = read(in_fd, buffer.data(), pipe_buffer_size);
+ if (bytes_read == -1) {
+ // read error
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ reading_status = STATUS::FAILED;
+ std::cerr << "Error: reading from the pipe failed." << std::endl;
+ }
+ } else if (bytes_read == 0) {
+ // EOF
+ reading_status = STATUS::FINISHED;
+ } else {
+ // data read
+ message.append(buffer.data(), bytes_read);
+ total_bytes_read += bytes_read;
+ for (const auto & package : json_to_packages(message)) {
+ if (info_option->get_value()) {
+ auto out = libdnf5::cli::output::PackageInfoSections();
+ out.setup_cols();
+ out.add_package(package);
+ out.print();
+ std::cout << std::endl;
+ } else {
+ std::cout << package.get_full_nevra() << std::endl;
+ }
+ }
+ }
+ } else if ((pfds[0].revents & POLLHUP) != 0) {
+ // EOF
+ reading_status = STATUS::FINISHED;
+ } else {
+ reading_status = STATUS::FAILED;
+ }
}
}
+ close(in_fd);
}
} // namespace dnfdaemon::client