// SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright OpenBMC Authors // SPDX-FileCopyrightText: Copyright 2020 Intel Corporation #include "subscription.hpp" #include "dbus_singleton.hpp" #include "event_log.hpp" #include "event_logs_object_type.hpp" #include "event_matches_filter.hpp" #include "event_service_store.hpp" #include "filter_expr_executor.hpp" #include "heartbeat_messages.hpp" #include "http_client.hpp" #include "http_response.hpp" #include "logging.hpp" #include "server_sent_event.hpp" #include "ssl_key_handler.hpp" #include "telemetry_readings.hpp" #include "utils/time_utils.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace redfish { Subscription::Subscription( std::shared_ptr userSubIn, const boost::urls::url_view_base& url, boost::asio::io_context& ioc) : userSub{std::move(userSubIn)}, policy(std::make_shared()), hbTimer(ioc) { userSub->destinationUrl = url; client.emplace(ioc, policy); // Subscription constructor policy->invalidResp = retryRespHandler; } Subscription::Subscription(crow::sse_socket::Connection& connIn) : userSub{std::make_shared()}, sseConn(&connIn), hbTimer(crow::connections::systemBus->get_io_context()) {} // callback for subscription sendData void Subscription::resHandler(const std::shared_ptr& /*self*/, const crow::Response& res) { BMCWEB_LOG_DEBUG("Response handled with return code: {}", res.resultInt()); if (!client) { BMCWEB_LOG_ERROR( "Http client wasn't filled but http client callback was called."); return; } if (userSub->retryPolicy != "TerminateAfterRetries") { return; } if (client->isTerminated()) { hbTimer.cancel(); if (deleter) { BMCWEB_LOG_INFO("Subscription {} is deleted after MaxRetryAttempts", userSub->id); deleter(); } } } void Subscription::sendHeartbeatEvent() { // send the heartbeat message nlohmann::json eventMessage = messages::redfishServiceFunctional(); eventMessage["EventTimestamp"] = time_utils::getDateTimeOffsetNow().first; eventMessage["OriginOfCondition"] = std::format("/redfish/v1/EventService/Subscriptions/{}", userSub->id); eventMessage["MemberId"] = "0"; nlohmann::json::array_t eventRecord; eventRecord.emplace_back(std::move(eventMessage)); nlohmann::json msgJson; msgJson["@odata.type"] = "#Event.v1_4_0.Event"; msgJson["Name"] = "Heartbeat"; msgJson["Events"] = std::move(eventRecord); std::string strMsg = msgJson.dump(2, ' ', true, nlohmann::json::error_handler_t::replace); // Note, eventId here is always zero, because this is a a per subscription // event and doesn't have an "ID" uint64_t eventId = 0; sendEventToSubscriber(eventId, std::move(strMsg)); } void Subscription::scheduleNextHeartbeatEvent() { hbTimer.expires_after(std::chrono::minutes(userSub->hbIntervalMinutes)); hbTimer.async_wait( std::bind_front(&Subscription::onHbTimeout, this, weak_from_this())); } void Subscription::heartbeatParametersChanged() { hbTimer.cancel(); if (userSub->sendHeartbeat) { scheduleNextHeartbeatEvent(); } } void Subscription::onHbTimeout(const std::weak_ptr& weakSelf, const boost::system::error_code& ec) { if (ec == boost::asio::error::operation_aborted) { BMCWEB_LOG_DEBUG("heartbeat timer async_wait is aborted"); return; } if (ec == boost::system::errc::operation_canceled) { BMCWEB_LOG_DEBUG("heartbeat timer async_wait canceled"); return; } if (ec) { BMCWEB_LOG_CRITICAL("heartbeat timer async_wait failed: {}", ec); return; } std::shared_ptr self = weakSelf.lock(); if (!self) { BMCWEB_LOG_CRITICAL("onHbTimeout failed on Subscription"); return; } // Timer expired. sendHeartbeatEvent(); // reschedule heartbeat timer scheduleNextHeartbeatEvent(); } bool Subscription::sendEventToSubscriber(uint64_t eventId, std::string&& msg) { persistent_data::EventServiceConfig eventServiceConfig = persistent_data::EventServiceStore::getInstance() .getEventServiceConfig(); if (!eventServiceConfig.enabled) { return false; } if (client) { boost::beast::http::fields httpHeadersCopy(userSub->httpHeaders); httpHeadersCopy.set(boost::beast::http::field::content_type, "application/json"); client->sendDataWithCallback( std::move(msg), userSub->destinationUrl, static_cast( userSub->verifyCertificate), httpHeadersCopy, boost::beast::http::verb::post, std::bind_front(&Subscription::resHandler, this, shared_from_this())); return true; } if (sseConn != nullptr) { sseConn->sendSseEvent(std::to_string(eventId), msg); } return true; } void Subscription::filterAndSendEventLogs( uint64_t eventId, const std::vector& eventRecords) { nlohmann::json::array_t logEntryArray; for (const EventLogObjectsType& logEntry : eventRecords) { BMCWEB_LOG_DEBUG("Processing logEntry: {}, {} '{}'", logEntry.id, logEntry.timestamp, logEntry.messageId); std::vector messageArgsView( logEntry.messageArgs.begin(), logEntry.messageArgs.end()); nlohmann::json::object_t bmcLogEntry; if (event_log::formatEventLogEntry( eventId, logEntry.id, logEntry.messageId, messageArgsView, logEntry.timestamp, userSub->customText, bmcLogEntry) != 0) { BMCWEB_LOG_WARNING("Read eventLog entry failed"); continue; } if (!eventMatchesFilter(*userSub, bmcLogEntry, "")) { BMCWEB_LOG_DEBUG("Event {} did not match the filter", nlohmann::json(bmcLogEntry).dump()); continue; } if (filter) { if (!memberMatches(bmcLogEntry, *filter)) { BMCWEB_LOG_DEBUG("Filter didn't match"); continue; } } logEntryArray.emplace_back(std::move(bmcLogEntry)); eventId++; } if (logEntryArray.empty()) { BMCWEB_LOG_DEBUG("No log entries available to be transferred."); return; } nlohmann::json msg; msg["@odata.type"] = "#Event.v1_4_0.Event"; msg["Id"] = std::to_string(eventId); msg["Name"] = "Event Log"; msg["Events"] = std::move(logEntryArray); std::string strMsg = msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace); sendEventToSubscriber(eventId, std::move(strMsg)); } void Subscription::filterAndSendReports(uint64_t eventId, const std::string& reportId, const telemetry::TimestampReadings& var) { boost::urls::url mrdUri = boost::urls::format( "/redfish/v1/TelemetryService/MetricReportDefinitions/{}", reportId); // Empty list means no filter. Send everything. if (!userSub->metricReportDefinitions.empty()) { if (std::ranges::find(userSub->metricReportDefinitions, mrdUri.buffer()) == userSub->metricReportDefinitions.end()) { return; } } nlohmann::json msg; if (!telemetry::fillReport(msg, reportId, var)) { BMCWEB_LOG_ERROR("Failed to fill the MetricReport for DBus " "Report with id {}", reportId); return; } // Context is set by user during Event subscription and it must be // set for MetricReport response. if (!userSub->customText.empty()) { msg["Context"] = userSub->customText; } std::string strMsg = msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace); sendEventToSubscriber(eventId, std::move(strMsg)); } void Subscription::updateRetryConfig(uint32_t retryAttempts, uint32_t retryTimeoutInterval) { if (policy == nullptr) { BMCWEB_LOG_DEBUG("Retry policy was nullptr, ignoring set"); return; } policy->maxRetryAttempts = retryAttempts; policy->retryIntervalSecs = std::chrono::seconds(retryTimeoutInterval); } bool Subscription::matchSseId(const crow::sse_socket::Connection& thisConn) { return &thisConn == sseConn; } // Check used to indicate what response codes are valid as part of our retry // policy. 2XX is considered acceptable boost::system::error_code Subscription::retryRespHandler(unsigned int respCode) { BMCWEB_LOG_DEBUG("Checking response code validity for SubscriptionEvent"); if ((respCode < 200) || (respCode >= 300)) { return boost::system::errc::make_error_code( boost::system::errc::result_out_of_range); } // Return 0 if the response code is valid return boost::system::errc::make_error_code(boost::system::errc::success); } } // namespace redfish