JSON-RPC 2.0
JSON-RPC 2.0 Modern C++ Library
Loading...
Searching...
No Matches
endpoint.cpp
Go to the documentation of this file.
2
3#include <asio.hpp>
5#include <spdlog/spdlog.h>
6
9
10namespace jsonrpc::endpoint {
11
15
17 asio::any_io_executor executor,
18 std::unique_ptr<transport::Transport> transport,
19 std::shared_ptr<spdlog::logger> logger)
20 : logger_(logger ? logger : spdlog::default_logger()),
21 executor_(std::move(executor)),
22 transport_(std::move(transport)),
23 dispatcher_(executor_, logger_),
24 endpoint_strand_(asio::make_strand(executor_)) {
25}
26
28 asio::any_io_executor executor,
29 std::unique_ptr<transport::Transport> transport)
30 -> asio::awaitable<std::expected<std::unique_ptr<RpcEndpoint>, RpcError>> {
31 auto endpoint = std::make_unique<RpcEndpoint>(executor, std::move(transport));
32
33 auto start_result = co_await endpoint->Start();
34 if (!start_result) {
35 co_return std::unexpected(start_result.error());
36 }
37
38 endpoint->Logger()->debug("Client endpoint initialized");
39 co_return endpoint;
40}
41
42auto RpcEndpoint::Start() -> asio::awaitable<std::expected<void, RpcError>> {
43 Logger()->debug("RpcEndpoint starting");
44 if (is_running_.exchange(true)) {
46 RpcErrorCode::kClientError, "RPC endpoint is already running");
47 }
48
49 pending_requests_.clear();
50
51 // Start the transport
52 auto start_result = co_await transport_->Start();
53 if (!start_result) {
54 co_return start_result;
55 }
56
57 // Start message processing on the endpoint strand
58 StartMessageProcessing();
59
60 // Ensure Start completes before Wait checks is_running_
61 co_return std::expected<void, RpcError>{};
62}
63
65 -> asio::awaitable<std::expected<void, RpcError>> {
66 // If already shut down, return immediately
67 if (!is_running_) {
68 co_return std::expected<void, RpcError>{};
69 }
70
71 if (message_loop_.valid()) {
72 co_await std::move(message_loop_);
73 }
74
75 co_return std::expected<void, RpcError>{};
76}
77
78auto RpcEndpoint::Shutdown() -> asio::awaitable<std::expected<void, RpcError>> {
79 if (!is_running_.exchange(false)) {
80 co_return std::expected<void, RpcError>{};
81 }
82
83 co_await asio::post(endpoint_strand_, asio::use_awaitable);
84 cancel_signal_.emit(asio::cancellation_type::all);
85
86 Logger()->debug("Shutting down RPC endpoint");
87
88 // Ensure all operations on the strand complete, including message processing
89
90 // Cancel pending requests
91 for (auto &[id, request] : pending_requests_) {
92 request->Cancel(-32603, "RPC endpoint shutting down");
93 }
94 pending_requests_.clear();
95
96 // Wait for the message loop to complete
97 if (message_loop_.valid()) {
98 co_await std::move(message_loop_);
99 }
100
101 // Now close the transport
102 auto close_result = co_await transport_->Close();
103 if (!close_result) {
104 co_return close_result;
105 }
106
107 co_return Ok();
108}
109
111 std::string method, std::optional<nlohmann::json> params)
112 -> asio::awaitable<std::expected<nlohmann::json, RpcError>> {
113 if (!is_running_) {
115 RpcErrorCode::kClientError, "RPC endpoint is not running");
116 }
117
118 auto request_id = GetNextRequestId();
119 Request request(method, std::move(params), request_id);
120 std::string message = request.ToJson().dump();
121
122 Logger()->debug("RpcEndpoint sending message: {}", message.substr(0, 70));
123 auto pending_request = std::make_shared<PendingRequest>(endpoint_strand_);
124 asio::post(endpoint_strand_, [this, request_id, pending_request] {
125 pending_requests_[request_id] = pending_request;
126 });
127
128 // FIX: Ensure the request is registered before sending to avoid race
129 // condition
130 co_await asio::post(endpoint_strand_, asio::use_awaitable);
131
132 auto send_result = co_await transport_->SendMessage(message);
133 if (!send_result) {
134 co_return std::unexpected(send_result.error());
135 }
136
137 auto result = co_await pending_request->GetResult();
138 if (result.contains("error")) {
139 auto err = result["error"];
141 RpcErrorCode::kClientError, err["message"].get<std::string>());
142 }
143
144 co_return result["result"];
145}
146
148 std::string method, std::optional<nlohmann::json> params)
149 -> asio::awaitable<std::expected<void, RpcError>> {
150 Logger()->debug("RpcEndpoint sending notification: {}", method);
151 if (!is_running_) {
153 RpcErrorCode::kClientError, "RpcEndpoint is not running");
154 }
155
156 Request request(method, std::move(params));
157 std::string message = request.ToJson().dump();
158
159 Logger()->debug("RpcEndpoint sending message: {}", message.substr(0, 70));
160 auto send_result = co_await transport_->SendMessage(message);
161 if (!send_result) {
162 co_return send_result;
163 }
164
165 co_return std::expected<void, RpcError>{};
166}
167
169 std::string method, typename Dispatcher::MethodCallHandler handler) {
170 dispatcher_.RegisterMethodCall(method, handler);
171}
172
174 std::string method, typename Dispatcher::NotificationHandler handler) {
175 dispatcher_.RegisterNotification(method, handler);
176}
177
179 // This is safe to call without a strand because we're just checking if empty
180 return !pending_requests_.empty();
181}
182
183void RpcEndpoint::StartMessageProcessing() {
184 Logger()->debug("RpcEndpoint starting message processing");
185 message_loop_ = asio::co_spawn(
186 endpoint_strand_,
187 [this] {
188 Logger()->debug(
189 "RpcEndpoint starting message processing, is_running_: {}",
190 is_running_.load());
191 return this->ProcessMessagesLoop(cancel_signal_.slot());
192 },
193 asio::use_awaitable);
194}
195
196namespace {
197auto RetryDelay(asio::any_io_executor exec) -> asio::awaitable<void> {
198 asio::steady_timer timer(exec, std::chrono::milliseconds(100));
199 co_await timer.async_wait(asio::use_awaitable);
200}
201} // namespace
202
203auto RpcEndpoint::ProcessMessagesLoop(asio::cancellation_slot slot)
204 -> asio::awaitable<void> {
205 auto state = co_await asio::this_coro::cancellation_state;
206
207 while (is_running_ && !state.cancelled()) {
208 auto message_result = co_await transport_->ReceiveMessage();
209 if (!message_result) {
210 Logger()->error("Receive error: {}", message_result.error().Message());
211 co_await RetryDelay(executor_);
212 continue;
213 }
214
215 // Spawn message handling concurrently (don't block the read loop)
216 asio::co_spawn(
217 executor_,
218 [this, message = *message_result]() -> asio::awaitable<void> {
219 auto handle_result = co_await HandleMessage(message);
220 if (!handle_result) {
221 Logger()->error(
222 "Handle error: {}", handle_result.error().Message());
223 }
224 co_return;
225 },
226 asio::detached);
227 }
228}
229
230namespace {
231auto IsResponse(const nlohmann::json &msg) -> bool {
232 return msg.contains("id") &&
233 (msg.contains("result") || msg.contains("error"));
234}
235} // namespace
236
237auto RpcEndpoint::HandleMessage(std::string message)
238 -> asio::awaitable<std::expected<void, RpcError>> {
239 Logger()->debug("RpcEndpoint handling message: {}", message.substr(0, 70));
240 const auto json_message_result =
241 nlohmann::json::parse(message, nullptr, false);
242 if (json_message_result.is_discarded()) {
244 RpcErrorCode::kClientError, "Failed to parse message");
245 }
246 const auto &json_message = json_message_result;
247
248 if (IsResponse(json_message)) {
249 auto response = Response::FromJson(json_message);
250 if (!response.has_value()) {
252 RpcErrorCode::kClientError, "Invalid response");
253 }
254 co_return co_await HandleResponse(std::move(response.value()));
255 }
256
257 if (auto response = co_await dispatcher_.DispatchRequest(message)) {
258 co_return co_await transport_->SendMessage(*response);
259 }
260
261 co_return std::expected<void, RpcError>{};
262}
263
264auto RpcEndpoint::HandleResponse(Response response)
265 -> asio::awaitable<std::expected<void, RpcError>> {
266 auto id_opt = response.GetId();
267 if (!id_opt || !std::holds_alternative<int64_t>(*id_opt)) {
269 RpcErrorCode::kClientError, "Response ID missing or not int64");
270 }
271
272 const auto id = std::get<int64_t>(*id_opt);
273
274 std::shared_ptr<PendingRequest> request;
275 bool found = false;
276
277 asio::post(endpoint_strand_, [this, id, &request, &found] {
278 auto it = pending_requests_.find(id);
279 if (it != pending_requests_.end()) {
280 request = it->second;
281 pending_requests_.erase(it);
282 found = true;
283 }
284 });
285
286 co_await asio::post(endpoint_strand_, asio::use_awaitable);
287
288 if (!found || !request) {
290 RpcErrorCode::kClientError,
291 "Unknown request ID: " + std::to_string(id));
292 }
293
294 request->SetResult(response.ToJson());
295 co_return std::expected<void, RpcError>{};
296}
297
298} // namespace jsonrpc::endpoint
std::function< asio::awaitable< nlohmann::json >( const std::optional< nlohmann::json > &)> MethodCallHandler
void RegisterNotification(const std::string &method, const NotificationHandler &handler)
std::function< asio::awaitable< void >( const std::optional< nlohmann::json > &)> NotificationHandler
void RegisterMethodCall(const std::string &method, const MethodCallHandler &handler)
auto ToJson() const -> nlohmann::json
Definition request.cpp:93
static auto FromJson(const nlohmann::json &json) -> std::expected< Response, error::RpcError >
Definition response.cpp:9
static auto CreateClient(asio::any_io_executor executor, std::unique_ptr< transport::Transport > transport) -> asio::awaitable< std::expected< std::unique_ptr< RpcEndpoint >, RpcError > >
Definition endpoint.cpp:27
RpcEndpoint(asio::any_io_executor executor, std::unique_ptr< transport::Transport > transport, std::shared_ptr< spdlog::logger > logger=nullptr)
Definition endpoint.cpp:16
auto HasPendingRequests() const -> bool
Definition endpoint.cpp:178
void RegisterNotification(std::string method, typename Dispatcher::NotificationHandler handler)
Definition endpoint.cpp:173
auto Shutdown() -> asio::awaitable< std::expected< void, RpcError > >
Definition endpoint.cpp:78
void RegisterMethodCall(std::string method, typename Dispatcher::MethodCallHandler handler)
Definition endpoint.cpp:168
auto WaitForShutdown() -> asio::awaitable< std::expected< void, RpcError > >
Definition endpoint.cpp:64
auto SendMethodCall(std::string method, std::optional< nlohmann::json > params=std::nullopt) -> asio::awaitable< std::expected< nlohmann::json, RpcError > >
Definition endpoint.cpp:110
auto Start() -> asio::awaitable< std::expected< void, RpcError > >
Definition endpoint.cpp:42
auto SendNotification(std::string method, std::optional< nlohmann::json > params=std::nullopt) -> asio::awaitable< std::expected< void, RpcError > >
Definition endpoint.cpp:147
auto Logger() -> std::shared_ptr< spdlog::logger >
Definition endpoint.hpp:55
static auto UnexpectedFromCode(RpcErrorCode code, std::string message="") -> std::unexpected< RpcError >
Definition error.hpp:101
auto Ok() -> std::expected< void, RpcError >
Definition error.hpp:111