Skip to content

Commit

Permalink
dnf5daemon-client: Repoquery uses new Rpm:list_fd() API
Browse files Browse the repository at this point in the history
  • Loading branch information
m-blaha committed Mar 6, 2024
1 parent aca99bf commit eab163e
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 18 deletions.
4 changes: 4 additions & 0 deletions dnf5daemon-client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ include_directories(..)

pkg_check_modules(SDBUS_CPP REQUIRED sdbus-c++)

pkg_check_modules(JSONC REQUIRED json-c)
include_directories(${JSONC_INCLUDE_DIRS})

add_executable(${DNF5DAEMON_CLIENT_BIN} ${DNF5DAEMON_CLIENT_SOURCES})

target_link_libraries(
Expand All @@ -24,6 +27,7 @@ target_link_libraries(
libdnf5
libdnf5-cli
${SDBUS_CPP_LIBRARIES}
${JSONC_LIBRARIES}
pthread
)

Expand Down
154 changes: 136 additions & 18 deletions dnf5daemon-client/commands/repoquery/repoquery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,20 @@ along with libdnf. If not, see <https://www.gnu.org/licenses/>.
#include "wrappers/dbus_package_wrapper.hpp"

#include <dnf5daemon-server/dbus.hpp>
#include <fcntl.h>
#include <fmt/format.h>
#include <json-c/json.h>
#include <libdnf5-cli/output/package_info_sections.hpp>
#include <libdnf5/common/exception.hpp>
#include <libdnf5/conf/option_string.hpp>
#include <libdnf5/rpm/package.hpp>
#include <libdnf5/rpm/package_query.hpp>
#include <libdnf5/rpm/package_set.hpp>
#include <poll.h>

#include <array>
#include <iostream>
#include <string>

namespace dnfdaemon::client {

Expand Down Expand Up @@ -98,6 +104,61 @@ dnfdaemon::KeyValueMap RepoqueryCommand::session_config() {
return cfg;
}

/// Read package objects from json stream. The `json_stream` string is modified, all
/// successfully parsed json objects are removed from it, possible partial json object
/// is left for next round of parsing after the rest of the string arrives.
std::vector<DbusPackageWrapper> json_to_packages(std::string & json_stream) {
libdnf_assert(json_stream.size() < 1000000, "can not parse JSON string bigger then maxing");
std::vector<DbusPackageWrapper> packages;

json_tokener * tokener;
tokener = json_tokener_new();
while (!json_stream.empty()) {
json_object * json_pkg =
json_tokener_parse_ex(tokener, json_stream.c_str(), static_cast<int>(json_stream.size()));
if (json_pkg) {
dnfdaemon::KeyValueMap dbuspkg;
json_object_object_foreach(json_pkg, key, val) {
switch (json_object_get_type(val)) {
case json_type_boolean:
dbuspkg[key] = static_cast<bool>(json_object_get_boolean(val));
break;
case json_type_int:
dbuspkg[key] = static_cast<uint64_t>(json_object_get_int64(val));
break;
default:
dbuspkg[key] = json_object_get_string(val);
}
}
packages.emplace_back(DbusPackageWrapper(dbuspkg));

auto parse_end = json_tokener_get_parse_end(tokener);
json_tokener_reset(tokener);
json_object_put(json_pkg);
if (parse_end < json_stream.size()) {
// more json objects on the line, remove parsed part and continue
json_stream = json_stream.substr(parse_end);
} else {
// everything has been parsed
json_stream.clear();
}
} else {
auto err = json_tokener_get_error(tokener);
if (err == json_tokener_continue) {
// only partial json string in the input line, continue on the next one
break;
} else {
// this is unrecoverable, throw error
// XXX throw
std::cout << "XXX error " << json_tokener_error_desc(err) << std::endl;
break;
}
}
}
json_tokener_free(tokener);
return packages;
}

void RepoqueryCommand::run() {
auto & ctx = get_context();

Expand Down Expand Up @@ -135,29 +196,86 @@ void RepoqueryCommand::run() {
options.insert(std::pair<std::string, std::vector<std::string>>("package_attrs", {"full_nevra"}));
}

dnfdaemon::KeyValueMapList packages;
ctx.session_proxy->callMethod("list")
std::array<int, 2> pipefd{}; // File descriptors for the pipe
if (pipe2(pipefd.data(), O_NONBLOCK) == -1) {
std::cerr << "Error: failed to create a pipe." << std::endl;
return;
}

std::string fd_id{};
ctx.session_proxy->callMethod("list_fd")
.onInterface(dnfdaemon::INTERFACE_RPM)
.withTimeout(static_cast<uint64_t>(-1))
.withArguments(options)
.storeResultsTo(packages);

auto num_packages = packages.size();
for (auto & raw_package : packages) {
--num_packages;
DbusPackageWrapper package(raw_package);
if (info_option->get_value()) {
auto out = libdnf5::cli::output::PackageInfoSections();
out.setup_cols();
out.add_package(package);
out.print();
if (num_packages) {
std::cout << std::endl;
}
} else {
std::cout << package.get_full_nevra() << std::endl;
.withArguments(sdbus::UnixFd{pipefd[1]})
.withArguments(fd_id);
close(pipefd[1]);

int in_fd = pipefd[0];

const size_t pipe_buffer_size = fcntl(in_fd, F_GETPIPE_SZ);
const int timeout = 30000;

std::array<pollfd, 1> pfds;
pfds[0].fd = in_fd;
pfds[0].events = POLLIN;

std::string message;
ssize_t total_bytes_read = 0;
ssize_t bytes_read = -1;

std::vector<char> buffer(pipe_buffer_size); // Buffer for reading data
enum class STATUS { PENDING, FINISHED, FAILED } reading_status;
reading_status = STATUS::PENDING;
while (reading_status == STATUS::PENDING) {
switch (poll(pfds.data(), pfds.size(), timeout)) {
case -1:
// poll() failed
reading_status = STATUS::FAILED;
std::cerr << "Error: poll() failed." << std::endl;
break;
case 0:
// timeout was reached
reading_status = STATUS::FAILED;
std::cerr << "Error: timeout while waiting for data." << std::endl;
break;
default:
if ((pfds[0].revents & POLLIN) != 0) {
bytes_read = read(in_fd, buffer.data(), pipe_buffer_size);
if (bytes_read == -1) {
// read error
if (errno != EAGAIN && errno != EWOULDBLOCK) {
reading_status = STATUS::FAILED;
std::cerr << "Error: reading from the pipe failed." << std::endl;
}
} else if (bytes_read == 0) {
// EOF
reading_status = STATUS::FINISHED;
} else {
// data read
message.append(buffer.data(), bytes_read);
total_bytes_read += bytes_read;
for (const auto & package : json_to_packages(message)) {
if (info_option->get_value()) {
auto out = libdnf5::cli::output::PackageInfoSections();
out.setup_cols();
out.add_package(package);
out.print();
std::cout << std::endl;
} else {
std::cout << package.get_full_nevra() << std::endl;
}
}
}
} else if ((pfds[0].revents & POLLHUP) != 0) {
// EOF
reading_status = STATUS::FINISHED;
} else {
reading_status = STATUS::FAILED;
}
}
}
close(in_fd);
}

} // namespace dnfdaemon::client

0 comments on commit eab163e

Please sign in to comment.