JSON-RPC 2.0
JSON-RPC 2.0 Modern C++ Library
Loading...
Searching...
No Matches
pipe_transport.cpp
Go to the documentation of this file.
2
3#include <algorithm>
4#include <filesystem>
5#include <string>
6
7#include <asio/co_spawn.hpp>
8#include <asio/detached.hpp>
9#include <asio/use_awaitable.hpp>
11#include <spdlog/spdlog.h>
12
13namespace jsonrpc::transport {
14
15using error::Ok;
16using error::RpcError;
18
20 asio::any_io_executor executor, std::string socket_path, bool is_server,
21 std::shared_ptr<spdlog::logger> logger)
22 : Transport(std::move(executor), logger),
23 socket_(GetExecutor()),
24 socket_path_(std::move(socket_path)),
25 is_server_(is_server),
26 read_buffer_() {
27}
28
30 if (!is_closed_) {
31 Logger()->debug("PipeTransport destructor triggering CloseNow()");
32 try {
33 CloseNow();
34 } catch (const std::exception &e) {
35 Logger()->error("PipeTransport destructor error: {}", e.what());
36 }
37 }
38}
39
41 -> asio::awaitable<std::expected<void, error::RpcError>> {
42 Logger()->debug("PipeTransport starting");
43 co_await asio::post(GetStrand(), asio::use_awaitable);
44
45 if (is_started_) {
46 Logger()->debug("PipeTransport already started");
48 RpcErrorCode::kTransportError, "PipeTransport already started");
49 }
50
51 if (is_closed_) {
52 Logger()->error("PipeTransport cannot start a closed transport");
54 RpcErrorCode::kTransportError, "Cannot start a closed transport");
55 }
56
57 if (is_server_) {
58 // For server, bind and listen for connections
59 Logger()->debug("PipeTransport starting server at {}", socket_path_);
60 auto result = co_await BindAndListen();
61 if (!result) {
62 Logger()->error(
63 "PipeTransport server error starting at {}: {}", socket_path_,
64 result.error().Message());
65 co_return result;
66 }
67 } else {
68 // For client, connect to the server
69 Logger()->debug("PipeTransport connecting client to {}", socket_path_);
70 auto result = co_await Connect();
71 if (!result) {
72 Logger()->error(
73 "PipeTransport client error connecting to {}: {}", socket_path_,
74 result.error().Message());
75 co_return result;
76 }
77 }
78 Logger()->debug("PipeTransport client connected to {}", socket_path_);
79
80 // Set started flag before performing operations
81 is_started_ = true;
82 Logger()->debug("PipeTransport successfully started");
83 co_return Ok();
84}
85
87 -> asio::awaitable<std::expected<void, error::RpcError>> {
88 Logger()->debug("PipeTransport closing");
89 co_await asio::post(GetStrand(), asio::use_awaitable);
90
91 if (is_closed_) {
92 Logger()->debug("PipeTransport already closed");
93 co_return Ok();
94 }
95
96 is_closed_ = true;
97 is_connected_ = false;
98
99 // Clear the message queue
100 send_queue_.clear();
101
102 // Cancel and close the socket safely
103 std::error_code ec;
104 if (socket_.is_open()) {
105 socket_.cancel(ec);
106 if (ec) {
107 Logger()->warn("PipeTransport error canceling socket: {}", ec.message());
108 }
109 socket_.close(ec);
110 if (ec) {
111 Logger()->warn("PipeTransport error closing socket: {}", ec.message());
112 }
113 }
114
115 // clean up acceptor if this is a server
116 if (is_server_ && acceptor_) {
117 acceptor_->cancel(ec);
118 if (ec) {
119 Logger()->warn(
120 "PipeTransport error canceling acceptor: {}", ec.message());
121 }
122 acceptor_->close(ec);
123 if (ec) {
124 Logger()->warn("PipeTransport error closing acceptor: {}", ec.message());
125 }
126 }
127
128 // Clean up the socket file if this is a server
129 if (is_server_ && !socket_path_.empty()) {
130 auto result = RemoveExistingSocketFile();
131 if (!result) {
132 Logger()->warn(
133 "PipeTransport error removing socket file: {}",
134 result.error().Message());
135 }
136 }
137
138 Logger()->debug("PipeTransport closed");
139 co_return Ok();
140}
141
143 is_closed_ = true;
144 is_connected_ = false;
145
146 // Clear the message queue
147 send_queue_.clear();
148
149 auto try_close_socket = [&]() {
150 if (!socket_.is_open()) {
151 return;
152 }
153 Logger()->debug("PipeTransport closing socket synchronously");
154
155 std::error_code ec;
156 socket_.cancel();
157 socket_.close(ec);
158 if (ec) {
159 Logger()->warn("PipeTransport error closing socket: {}", ec.message());
160 }
161 };
162
163 auto try_close_acceptor = [&]() {
164 if (!is_server_ || !acceptor_ || !acceptor_->is_open()) {
165 return;
166 }
167 Logger()->debug("PipeTransport closing acceptor synchronously");
168
169 std::error_code ec;
170 acceptor_->cancel();
171 acceptor_->close(ec);
172 if (ec) {
173 Logger()->warn("PipeTransport error closing acceptor: {}", ec.message());
174 }
175 };
176
177 auto try_remove_socket_file = [&]() {
178 if (!is_server_ || socket_path_.empty()) {
179 return;
180 }
181
182 std::error_code ec;
183 if (std::filesystem::exists(socket_path_, ec) && !ec) {
184 std::filesystem::remove(socket_path_, ec);
185 if (ec) {
186 Logger()->warn(
187 "PipeTransport error removing socket file: {}", ec.message());
188 } else {
189 Logger()->debug("PipeTransport removed socket file: {}", socket_path_);
190 }
191 } else if (ec) {
192 Logger()->warn(
193 "PipeTransport error checking socket file existence: {}",
194 ec.message());
195 }
196 };
197
198 try {
199 try_close_socket();
200 try_close_acceptor();
201 try_remove_socket_file();
202 } catch (const std::exception &e) {
203 Logger()->error("PipeTransport error during CloseNow(): {}", e.what());
204 }
205}
206
207auto PipeTransport::GetSocket() -> asio::local::stream_protocol::socket & {
208 return socket_;
209}
210
212 -> std::expected<void, error::RpcError> {
213 std::error_code ec;
214 if (std::filesystem::exists(socket_path_, ec)) {
215 if (ec) {
217 RpcErrorCode::kTransportError,
218 "Error checking if socket file exists: " + ec.message());
219 }
220 std::filesystem::remove(socket_path_, ec);
221 if (ec) {
223 RpcErrorCode::kTransportError,
224 "Error removing socket file: " + ec.message());
225 }
226 Logger()->debug(
227 "PipeTransport removed existing socket file: {}", socket_path_);
228 } else {
229 Logger()->debug(
230 "PipeTransport no existing socket file to remove: {}", socket_path_);
231 }
232 return {};
233}
234
235auto PipeTransport::SendMessage(std::string message)
236 -> asio::awaitable<std::expected<void, error::RpcError>> {
237 co_await asio::post(GetStrand(), asio::use_awaitable);
238
239 if (is_closed_) {
241 RpcErrorCode::kTransportError,
242 "Attempt to send message on closed transport");
243 }
244
245 if (!is_started_) {
247 RpcErrorCode::kTransportError,
248 "Transport not started before sending message");
249 }
250
251 if (!socket_.is_open()) {
253 RpcErrorCode::kTransportError, "Socket not open");
254 }
255
256 Logger()->debug("Queuing {} bytes to send to pipe", message.size());
257 send_queue_.push_back(std::move(message));
258
259 // If there's no active sending task, start one
260 if (!sending_.exchange(true)) {
261 asio::co_spawn(GetStrand(), SendMessageLoop(), asio::detached);
262 }
263
264 co_return Ok();
265}
266
267auto PipeTransport::SendMessageLoop() -> asio::awaitable<void> {
268 while (!send_queue_.empty()) {
269 std::string message = std::move(send_queue_.front());
270 send_queue_.pop_front();
271
272 Logger()->debug("Sending {} bytes to pipe", message.size());
273 std::size_t bytes_sent = 0;
274 const std::size_t chunk_size =
275 32 * 1024; // 32KB chunks to be safe for WSL's 64KB limit
276
277 while (bytes_sent < message.size()) {
278 auto remaining = message.size() - bytes_sent;
279 auto current_chunk_size = std::min(remaining, chunk_size);
280
281 // Use string_view to avoid copying data
282 std::string_view chunk =
283 std::string_view(message).substr(bytes_sent, current_chunk_size);
284
285 // Write to the socket with error redirection
286 std::error_code ec;
287 auto chunk_sent = co_await asio::async_write(
288 socket_, asio::buffer(chunk),
289 asio::redirect_error(asio::use_awaitable, ec));
290
291 if (ec) {
292 Logger()->error(
293 "PipeTransport error sending message: {}", ec.message());
294 break; // Exit but continue processing other messages
295 }
296
297 bytes_sent += chunk_sent;
298 Logger()->debug(
299 "Sent {} bytes to pipe, total {}/{}", chunk_sent, bytes_sent,
300 message.size());
301 }
302 }
303
304 // Mark sending as complete
305 sending_ = false;
306}
307
309 -> asio::awaitable<std::expected<void, error::RpcError>> {
310 Logger()->debug("Flushing message queue");
311 while (true) {
312 co_await asio::post(GetStrand(), asio::use_awaitable);
313 if (send_queue_.empty() && !sending_) {
314 break;
315 }
316 co_await asio::steady_timer(GetExecutor(), std::chrono::milliseconds(10))
317 .async_wait(asio::use_awaitable);
318 }
319 co_return Ok();
320}
321
323 -> asio::awaitable<std::expected<std::string, error::RpcError>> {
324 co_await asio::post(GetStrand(), asio::use_awaitable);
325
326 if (is_closed_) {
327 Logger()->warn(
328 "PipeTransport ReceiveMessage called after transport was closed");
330 RpcErrorCode::kTransportError,
331 "ReceiveMessage called after transport was closed");
332 }
333
334 if (!is_started_) {
336 RpcErrorCode::kTransportError,
337 "Transport not started before receiving message");
338 }
339
340 if (!socket_.is_open()) {
341 Logger()->warn("PipeTransport ReceiveMessage called on a closed socket");
343 RpcErrorCode::kTransportError,
344 "ReceiveMessage called on a closed socket");
345 }
346
347 message_buffer_.clear();
348
349 std::error_code ec;
350 std::size_t bytes_read = co_await socket_.async_read_some(
351 asio::buffer(read_buffer_),
352 asio::redirect_error(asio::use_awaitable, ec));
353
354 if (ec) {
355 if (ec == asio::error::eof) {
356 Logger()->debug("PipeTransport connection closed by peer (EOF)");
357 is_connected_ = false;
359 RpcErrorCode::kTransportError, "Connection closed by peer");
360 } else if (ec == asio::error::operation_aborted) {
361 Logger()->debug("PipeTransport Receive operation aborted");
363 RpcErrorCode::kTransportError, "Receive aborted");
364 } else {
365 Logger()->error(
366 "PipeTransport error receiving message: {}", ec.message());
368 RpcErrorCode::kTransportError, "Receive error: " + ec.message());
369 }
370 }
371
372 if (bytes_read == 0) {
374 RpcErrorCode::kTransportError, "No data received");
375 }
376
377 message_buffer_.append(read_buffer_.data(), bytes_read);
378 auto log_message = message_buffer_;
379 if (log_message.size() > 70) {
380 log_message = log_message.substr(0, 70) + "...";
381 }
382 std::ranges::replace(log_message, '\n', ' ');
383 std::ranges::replace(log_message, '\r', ' ');
384 Logger()->debug("PipeTransport received message: {}", log_message);
385 co_return std::move(message_buffer_);
386}
387
389 -> asio::awaitable<std::expected<void, error::RpcError>> {
390 Logger()->debug("PipeTransport connecting to {}", socket_path_);
391
392 // Make sure we're not already connected
393 if (is_connected_) {
394 co_return Ok();
395 }
396
397 // Check if we're closed
398 if (is_closed_) {
400 RpcErrorCode::kTransportError, "Cannot connect a closed transport");
401 }
402
403 // Close any existing socket
404 std::error_code ec;
405 if (socket_.is_open()) {
406 socket_.close(ec);
407 if (ec) {
408 Logger()->warn(
409 "PipeTransport error closing socket before reconnect: {}",
410 ec.message());
411 }
412 }
413
414 // Create a new socket
415 socket_ = asio::local::stream_protocol::socket(GetExecutor());
416
417 // Create the endpoint and connect
418 asio::local::stream_protocol::endpoint endpoint(socket_path_);
419 co_await socket_.async_connect(
420 endpoint, asio::redirect_error(asio::use_awaitable, ec));
421 if (ec) {
422 Logger()->error(
423 "PipeTransport error connecting to {}: {}", socket_path_, ec.message());
425 RpcErrorCode::kTransportError, "Error connecting to: " + ec.message());
426 }
427
428 is_connected_ = true;
429 Logger()->debug("PipeTransport connected to {}", socket_path_);
430
431 co_return Ok();
432}
433
435 -> asio::awaitable<std::expected<void, error::RpcError>> {
436 Logger()->debug("PipeTransport binding to {}", socket_path_);
437
438 auto result = RemoveExistingSocketFile();
439 if (!result) {
440 Logger()->error(
441 "PipeTransport error removing existing socket file: {}",
442 result.error().Message());
443 co_return result;
444 }
445
446 // Lazily construct the acceptor if not already created
447 if (!acceptor_) {
448 acceptor_ =
449 std::make_unique<asio::local::stream_protocol::acceptor>(GetExecutor());
450 }
451
452 // Create the endpoint
453 asio::local::stream_protocol::endpoint endpoint(socket_path_);
454
455 // Open and bind the acceptor
456 std::error_code ec;
457 acceptor_->open(endpoint.protocol(), ec);
458 if (ec) {
459 Logger()->error("PipeTransport error opening acceptor: {}", ec.message());
461 RpcErrorCode::kTransportError,
462 "Error opening acceptor: " + ec.message());
463 }
464 acceptor_->bind(endpoint, ec);
465 if (ec) {
466 Logger()->error("PipeTransport error binding acceptor: {}", ec.message());
468 RpcErrorCode::kTransportError,
469 "Error binding acceptor: " + ec.message());
470 }
471 acceptor_->listen(asio::socket_base::max_listen_connections, ec);
472 if (ec) {
473 Logger()->error(
474 "PipeTransport error listening on acceptor: {}", ec.message());
476 RpcErrorCode::kTransportError,
477 "Error listening on acceptor: " + ec.message());
478 }
479
480 // Accept a connection
481 Logger()->debug("PipeTransport waiting for connection on {}", socket_path_);
482 co_await acceptor_->async_accept(
483 socket_, asio::redirect_error(asio::use_awaitable, ec));
484 if (ec) {
485 Logger()->error(
486 "PipeTransport error accepting connection: {}", ec.message());
488 RpcErrorCode::kTransportError,
489 "Error accepting connection: " + ec.message());
490 }
491 is_connected_ = true;
492 Logger()->debug("PipeTransport accepted connection on {}", socket_path_);
493
494 co_return Ok();
495}
496
497} // namespace jsonrpc::transport
static auto UnexpectedFromCode(RpcErrorCode code, std::string message="") -> std::unexpected< RpcError >
Definition error.hpp:101
auto Start() -> asio::awaitable< std::expected< void, error::RpcError > > override
auto RemoveExistingSocketFile() -> std::expected< void, error::RpcError >
PipeTransport(asio::any_io_executor executor, std::string socket_path, bool is_server=false, std::shared_ptr< spdlog::logger > logger=nullptr)
auto Connect() -> asio::awaitable< std::expected< void, error::RpcError > >
auto BindAndListen() -> asio::awaitable< std::expected< void, error::RpcError > >
auto GetSocket() -> asio::local::stream_protocol::socket &
auto Close() -> asio::awaitable< std::expected< void, error::RpcError > > override
auto SendMessage(std::string message) -> asio::awaitable< std::expected< void, error::RpcError > > override
auto Flush() -> asio::awaitable< std::expected< void, error::RpcError > >
auto ReceiveMessage() -> asio::awaitable< std::expected< std::string, error::RpcError > > override
auto Logger() -> std::shared_ptr< spdlog::logger >
Definition transport.hpp:54
auto Ok() -> std::expected< void, RpcError >
Definition error.hpp:111