13 asio::any_io_executor executor, std::shared_ptr<spdlog::logger> logger)
14 : executor_(std::move(executor)),
15 logger_(logger ? logger : spdlog::default_logger()) {
20 method_handlers_[method] = handler;
25 notification_handlers_[method] = handler;
29 -> asio::awaitable<std::optional<std::string>> {
30 auto root = nlohmann::json::parse(request,
nullptr,
false);
31 if (root.is_discarded()) {
36 if (root.is_object()) {
38 if (!request.has_value()) {
42 auto response =
co_await DispatchSingleRequest(request.value());
43 if (response.has_value()) {
44 co_return response.value().ToJson().dump();
46 co_return std::nullopt;
50 if (root.is_array()) {
57 std::vector<Request> requests;
58 std::vector<Response> responses;
59 for (
const auto& element : root) {
61 if (!request.has_value()) {
65 requests.push_back(request.value());
68 auto dispatched =
co_await DispatchBatchRequest(requests);
69 for (
const auto& response : dispatched) {
70 responses.push_back(response);
73 co_return nlohmann::json(responses).dump();
81auto Dispatcher::DispatchSingleRequest(
Request request)
82 -> asio::awaitable<std::optional<Response>> {
83 auto method = request.GetMethod();
85 if (request.IsNotification()) {
86 auto it = notification_handlers_.find(method);
87 if (it != notification_handlers_.end()) {
89 "Dispatcher found notification handler for method: {}", method);
92 [handler = it->second, params = request.GetParams()] {
93 return handler(params);
96 co_return std::nullopt;
99 "Dispatcher notification handler not found for method: {}", method);
100 co_return std::nullopt;
103 auto it = method_handlers_.find(method);
104 if (it != method_handlers_.end()) {
105 Logger()->debug(
"Dispatcher found method handler for method: {}", method);
106 auto result =
co_await asio::co_spawn(
108 [handler = it->second, params = request.GetParams()] {
109 return handler(params);
111 asio::use_awaitable);
114 Logger()->debug(
"Dispatcher method handler not found for method: {}", method);
116 RpcErrorCode::kMethodNotFound, request.GetId());
119auto Dispatcher::DispatchBatchRequest(std::vector<Request> requests)
120 -> asio::awaitable<std::vector<Response>> {
121 std::vector<asio::awaitable<std::optional<Response>>> pending;
122 pending.reserve(requests.size());
125 for (
const auto& request : requests) {
127 pending.push_back(DispatchSingleRequest(request));
131 std::vector<Response> responses;
132 for (
auto& awaitable_response : pending) {
133 auto response =
co_await std::move(awaitable_response);
134 if (response.has_value()) {
135 responses.push_back(response.value());
Dispatcher(asio::any_io_executor executor, std::shared_ptr< spdlog::logger > logger=nullptr)
std::function< asio::awaitable< nlohmann::json >( const std::optional< nlohmann::json > &)> MethodCallHandler
void RegisterNotification(const std::string &method, const NotificationHandler &handler)
std::function< asio::awaitable< void >( const std::optional< nlohmann::json > &)> NotificationHandler
auto DispatchRequest(std::string request) -> asio::awaitable< std::optional< std::string > >
void RegisterMethodCall(const std::string &method, const MethodCallHandler &handler)
static auto FromJson(const nlohmann::json &json_obj) -> std::expected< Request, error::RpcError >
static auto CreateSuccess(const nlohmann::json &result, const std::optional< RequestId > &id) -> Response
static auto CreateError(RpcErrorCode code, const std::optional< RequestId > &id=std::nullopt) -> Response