JSON-RPC 2.0
JSON-RPC 2.0 Modern C++ Library
Loading...
Searching...
No Matches
jsonrpc::transport::PipeTransport Class Reference

#include <pipe_transport.hpp>

Inheritance diagram for jsonrpc::transport::PipeTransport:
Collaboration diagram for jsonrpc::transport::PipeTransport:

Public Member Functions

 PipeTransport (asio::any_io_executor executor, std::string socket_path, bool is_server=false, std::shared_ptr< spdlog::logger > logger=nullptr)
 
 ~PipeTransport () override
 
 PipeTransport (const PipeTransport &)=delete
 
auto operator= (const PipeTransport &) -> PipeTransport &=delete
 
 PipeTransport (PipeTransport &&)=delete
 
auto operator= (PipeTransport &&) -> PipeTransport &=delete
 
auto Start () -> asio::awaitable< std::expected< void, error::RpcError > > override
 
auto Close () -> asio::awaitable< std::expected< void, error::RpcError > > override
 
void CloseNow () override
 
auto SendMessage (std::string message) -> asio::awaitable< std::expected< void, error::RpcError > > override
 
auto Flush () -> asio::awaitable< std::expected< void, error::RpcError > >
 
auto ReceiveMessage () -> asio::awaitable< std::expected< std::string, error::RpcError > > override
 
- Public Member Functions inherited from jsonrpc::transport::Transport
 Transport (asio::any_io_executor executor, std::shared_ptr< spdlog::logger > logger=nullptr)
 
 Transport (const Transport &)=delete
 
 Transport (Transport &&)=delete
 
auto operator= (const Transport &) -> Transport &=delete
 
auto operator= (Transport &&) -> Transport &=delete
 
virtual ~Transport ()=default
 
auto GetExecutor () const -> asio::any_io_executor
 
auto GetStrand () -> asio::strand< asio::any_io_executor > &
 

Protected Member Functions

auto GetSocket () -> asio::local::stream_protocol::socket &
 
auto RemoveExistingSocketFile () -> std::expected< void, error::RpcError >
 
auto Connect () -> asio::awaitable< std::expected< void, error::RpcError > >
 
auto BindAndListen () -> asio::awaitable< std::expected< void, error::RpcError > >
 
- Protected Member Functions inherited from jsonrpc::transport::Transport
auto Logger () -> std::shared_ptr< spdlog::logger >
 

Detailed Description

Definition at line 17 of file pipe_transport.hpp.

Constructor & Destructor Documentation

◆ PipeTransport() [1/3]

jsonrpc::transport::PipeTransport::PipeTransport ( asio::any_io_executor executor,
std::string socket_path,
bool is_server = false,
std::shared_ptr< spdlog::logger > logger = nullptr )
explicit

Definition at line 19 of file pipe_transport.cpp.

22 : Transport(std::move(executor), logger),
23 socket_(GetExecutor()),
24 socket_path_(std::move(socket_path)),
25 is_server_(is_server),
26 read_buffer_() {
27}
Transport(asio::any_io_executor executor, std::shared_ptr< spdlog::logger > logger=nullptr)
Definition transport.hpp:15
auto GetExecutor() const -> asio::any_io_executor
Definition transport.hpp:45

◆ ~PipeTransport()

jsonrpc::transport::PipeTransport::~PipeTransport ( )
override

Definition at line 29 of file pipe_transport.cpp.

29 {
30 if (!is_closed_) {
31 Logger()->debug("PipeTransport destructor triggering CloseNow()");
32 try {
33 CloseNow();
34 } catch (const std::exception &e) {
35 Logger()->error("PipeTransport destructor error: {}", e.what());
36 }
37 }
38}
auto Logger() -> std::shared_ptr< spdlog::logger >
Definition transport.hpp:54

References CloseNow(), and jsonrpc::transport::Transport::Logger().

Here is the call graph for this function:

◆ PipeTransport() [2/3]

jsonrpc::transport::PipeTransport::PipeTransport ( const PipeTransport & )
delete

◆ PipeTransport() [3/3]

jsonrpc::transport::PipeTransport::PipeTransport ( PipeTransport && )
delete

Member Function Documentation

◆ BindAndListen()

auto jsonrpc::transport::PipeTransport::BindAndListen ( ) -> asio::awaitable<std::expected<void, error::RpcError>>
protected

Definition at line 434 of file pipe_transport.cpp.

435 {
436 Logger()->debug("PipeTransport binding to {}", socket_path_);
437
438 auto result = RemoveExistingSocketFile();
439 if (!result) {
440 Logger()->error(
441 "PipeTransport error removing existing socket file: {}",
442 result.error().Message());
443 co_return result;
444 }
445
446 // Lazily construct the acceptor if not already created
447 if (!acceptor_) {
448 acceptor_ =
449 std::make_unique<asio::local::stream_protocol::acceptor>(GetExecutor());
450 }
451
452 // Create the endpoint
453 asio::local::stream_protocol::endpoint endpoint(socket_path_);
454
455 // Open and bind the acceptor
456 std::error_code ec;
457 acceptor_->open(endpoint.protocol(), ec);
458 if (ec) {
459 Logger()->error("PipeTransport error opening acceptor: {}", ec.message());
461 RpcErrorCode::kTransportError,
462 "Error opening acceptor: " + ec.message());
463 }
464 acceptor_->bind(endpoint, ec);
465 if (ec) {
466 Logger()->error("PipeTransport error binding acceptor: {}", ec.message());
468 RpcErrorCode::kTransportError,
469 "Error binding acceptor: " + ec.message());
470 }
471 acceptor_->listen(asio::socket_base::max_listen_connections, ec);
472 if (ec) {
473 Logger()->error(
474 "PipeTransport error listening on acceptor: {}", ec.message());
476 RpcErrorCode::kTransportError,
477 "Error listening on acceptor: " + ec.message());
478 }
479
480 // Accept a connection
481 Logger()->debug("PipeTransport waiting for connection on {}", socket_path_);
482 co_await acceptor_->async_accept(
483 socket_, asio::redirect_error(asio::use_awaitable, ec));
484 if (ec) {
485 Logger()->error(
486 "PipeTransport error accepting connection: {}", ec.message());
488 RpcErrorCode::kTransportError,
489 "Error accepting connection: " + ec.message());
490 }
491 is_connected_ = true;
492 Logger()->debug("PipeTransport accepted connection on {}", socket_path_);
493
494 co_return Ok();
495}
static auto UnexpectedFromCode(RpcErrorCode code, std::string message="") -> std::unexpected< RpcError >
Definition error.hpp:101
auto RemoveExistingSocketFile() -> std::expected< void, error::RpcError >

References jsonrpc::error::RpcError::UnexpectedFromCode().

Here is the call graph for this function:

◆ Close()

auto jsonrpc::transport::PipeTransport::Close ( ) -> asio::awaitable<std::expected<void, error::RpcError>>
overridevirtual

Implements jsonrpc::transport::Transport.

Definition at line 86 of file pipe_transport.cpp.

87 {
88 Logger()->debug("PipeTransport closing");
89 co_await asio::post(GetStrand(), asio::use_awaitable);
90
91 if (is_closed_) {
92 Logger()->debug("PipeTransport already closed");
93 co_return Ok();
94 }
95
96 is_closed_ = true;
97 is_connected_ = false;
98
99 // Clear the message queue
100 send_queue_.clear();
101
102 // Cancel and close the socket safely
103 std::error_code ec;
104 if (socket_.is_open()) {
105 socket_.cancel(ec);
106 if (ec) {
107 Logger()->warn("PipeTransport error canceling socket: {}", ec.message());
108 }
109 socket_.close(ec);
110 if (ec) {
111 Logger()->warn("PipeTransport error closing socket: {}", ec.message());
112 }
113 }
114
115 // clean up acceptor if this is a server
116 if (is_server_ && acceptor_) {
117 acceptor_->cancel(ec);
118 if (ec) {
119 Logger()->warn(
120 "PipeTransport error canceling acceptor: {}", ec.message());
121 }
122 acceptor_->close(ec);
123 if (ec) {
124 Logger()->warn("PipeTransport error closing acceptor: {}", ec.message());
125 }
126 }
127
128 // Clean up the socket file if this is a server
129 if (is_server_ && !socket_path_.empty()) {
130 auto result = RemoveExistingSocketFile();
131 if (!result) {
132 Logger()->warn(
133 "PipeTransport error removing socket file: {}",
134 result.error().Message());
135 }
136 }
137
138 Logger()->debug("PipeTransport closed");
139 co_return Ok();
140}
auto GetStrand() -> asio::strand< asio::any_io_executor > &
Definition transport.hpp:49

◆ CloseNow()

void jsonrpc::transport::PipeTransport::CloseNow ( )
overridevirtual

Implements jsonrpc::transport::Transport.

Definition at line 142 of file pipe_transport.cpp.

142 {
143 is_closed_ = true;
144 is_connected_ = false;
145
146 // Clear the message queue
147 send_queue_.clear();
148
149 auto try_close_socket = [&]() {
150 if (!socket_.is_open()) {
151 return;
152 }
153 Logger()->debug("PipeTransport closing socket synchronously");
154
155 std::error_code ec;
156 socket_.cancel();
157 socket_.close(ec);
158 if (ec) {
159 Logger()->warn("PipeTransport error closing socket: {}", ec.message());
160 }
161 };
162
163 auto try_close_acceptor = [&]() {
164 if (!is_server_ || !acceptor_ || !acceptor_->is_open()) {
165 return;
166 }
167 Logger()->debug("PipeTransport closing acceptor synchronously");
168
169 std::error_code ec;
170 acceptor_->cancel();
171 acceptor_->close(ec);
172 if (ec) {
173 Logger()->warn("PipeTransport error closing acceptor: {}", ec.message());
174 }
175 };
176
177 auto try_remove_socket_file = [&]() {
178 if (!is_server_ || socket_path_.empty()) {
179 return;
180 }
181
182 std::error_code ec;
183 if (std::filesystem::exists(socket_path_, ec) && !ec) {
184 std::filesystem::remove(socket_path_, ec);
185 if (ec) {
186 Logger()->warn(
187 "PipeTransport error removing socket file: {}", ec.message());
188 } else {
189 Logger()->debug("PipeTransport removed socket file: {}", socket_path_);
190 }
191 } else if (ec) {
192 Logger()->warn(
193 "PipeTransport error checking socket file existence: {}",
194 ec.message());
195 }
196 };
197
198 try {
199 try_close_socket();
200 try_close_acceptor();
201 try_remove_socket_file();
202 } catch (const std::exception &e) {
203 Logger()->error("PipeTransport error during CloseNow(): {}", e.what());
204 }
205}

References jsonrpc::transport::Transport::Logger().

Referenced by ~PipeTransport().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Connect()

auto jsonrpc::transport::PipeTransport::Connect ( ) -> asio::awaitable<std::expected<void, error::RpcError>>
protected

Definition at line 388 of file pipe_transport.cpp.

389 {
390 Logger()->debug("PipeTransport connecting to {}", socket_path_);
391
392 // Make sure we're not already connected
393 if (is_connected_) {
394 co_return Ok();
395 }
396
397 // Check if we're closed
398 if (is_closed_) {
400 RpcErrorCode::kTransportError, "Cannot connect a closed transport");
401 }
402
403 // Close any existing socket
404 std::error_code ec;
405 if (socket_.is_open()) {
406 socket_.close(ec);
407 if (ec) {
408 Logger()->warn(
409 "PipeTransport error closing socket before reconnect: {}",
410 ec.message());
411 }
412 }
413
414 // Create a new socket
415 socket_ = asio::local::stream_protocol::socket(GetExecutor());
416
417 // Create the endpoint and connect
418 asio::local::stream_protocol::endpoint endpoint(socket_path_);
419 co_await socket_.async_connect(
420 endpoint, asio::redirect_error(asio::use_awaitable, ec));
421 if (ec) {
422 Logger()->error(
423 "PipeTransport error connecting to {}: {}", socket_path_, ec.message());
425 RpcErrorCode::kTransportError, "Error connecting to: " + ec.message());
426 }
427
428 is_connected_ = true;
429 Logger()->debug("PipeTransport connected to {}", socket_path_);
430
431 co_return Ok();
432}

References jsonrpc::error::RpcError::UnexpectedFromCode().

Here is the call graph for this function:

◆ Flush()

auto jsonrpc::transport::PipeTransport::Flush ( ) -> asio::awaitable<std::expected<void, error::RpcError>>

Definition at line 308 of file pipe_transport.cpp.

309 {
310 Logger()->debug("Flushing message queue");
311 while (true) {
312 co_await asio::post(GetStrand(), asio::use_awaitable);
313 if (send_queue_.empty() && !sending_) {
314 break;
315 }
316 co_await asio::steady_timer(GetExecutor(), std::chrono::milliseconds(10))
317 .async_wait(asio::use_awaitable);
318 }
319 co_return Ok();
320}

◆ GetSocket()

auto jsonrpc::transport::PipeTransport::GetSocket ( ) -> asio::local::stream_protocol::socket&
protected

Definition at line 207 of file pipe_transport.cpp.

207 {
208 return socket_;
209}

◆ operator=() [1/2]

auto jsonrpc::transport::PipeTransport::operator= ( const PipeTransport & ) -> PipeTransport &=delete
delete

◆ operator=() [2/2]

auto jsonrpc::transport::PipeTransport::operator= ( PipeTransport && ) -> PipeTransport &=delete
delete

◆ ReceiveMessage()

auto jsonrpc::transport::PipeTransport::ReceiveMessage ( ) -> asio::awaitable<std::expected<std::string, error::RpcError>>
overridevirtual

Implements jsonrpc::transport::Transport.

Definition at line 322 of file pipe_transport.cpp.

323 {
324 co_await asio::post(GetStrand(), asio::use_awaitable);
325
326 if (is_closed_) {
327 Logger()->warn(
328 "PipeTransport ReceiveMessage called after transport was closed");
330 RpcErrorCode::kTransportError,
331 "ReceiveMessage called after transport was closed");
332 }
333
334 if (!is_started_) {
336 RpcErrorCode::kTransportError,
337 "Transport not started before receiving message");
338 }
339
340 if (!socket_.is_open()) {
341 Logger()->warn("PipeTransport ReceiveMessage called on a closed socket");
343 RpcErrorCode::kTransportError,
344 "ReceiveMessage called on a closed socket");
345 }
346
347 message_buffer_.clear();
348
349 std::error_code ec;
350 std::size_t bytes_read = co_await socket_.async_read_some(
351 asio::buffer(read_buffer_),
352 asio::redirect_error(asio::use_awaitable, ec));
353
354 if (ec) {
355 if (ec == asio::error::eof) {
356 Logger()->debug("PipeTransport connection closed by peer (EOF)");
357 is_connected_ = false;
359 RpcErrorCode::kTransportError, "Connection closed by peer");
360 } else if (ec == asio::error::operation_aborted) {
361 Logger()->debug("PipeTransport Receive operation aborted");
363 RpcErrorCode::kTransportError, "Receive aborted");
364 } else {
365 Logger()->error(
366 "PipeTransport error receiving message: {}", ec.message());
368 RpcErrorCode::kTransportError, "Receive error: " + ec.message());
369 }
370 }
371
372 if (bytes_read == 0) {
374 RpcErrorCode::kTransportError, "No data received");
375 }
376
377 message_buffer_.append(read_buffer_.data(), bytes_read);
378 auto log_message = message_buffer_;
379 if (log_message.size() > 70) {
380 log_message = log_message.substr(0, 70) + "...";
381 }
382 std::ranges::replace(log_message, '\n', ' ');
383 std::ranges::replace(log_message, '\r', ' ');
384 Logger()->debug("PipeTransport received message: {}", log_message);
385 co_return std::move(message_buffer_);
386}

References jsonrpc::error::RpcError::UnexpectedFromCode().

Referenced by jsonrpc::transport::FramedPipeTransport::ReceiveMessage().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ RemoveExistingSocketFile()

auto jsonrpc::transport::PipeTransport::RemoveExistingSocketFile ( ) -> std::expected<void, error::RpcError>
protected

Definition at line 211 of file pipe_transport.cpp.

212 {
213 std::error_code ec;
214 if (std::filesystem::exists(socket_path_, ec)) {
215 if (ec) {
217 RpcErrorCode::kTransportError,
218 "Error checking if socket file exists: " + ec.message());
219 }
220 std::filesystem::remove(socket_path_, ec);
221 if (ec) {
223 RpcErrorCode::kTransportError,
224 "Error removing socket file: " + ec.message());
225 }
226 Logger()->debug(
227 "PipeTransport removed existing socket file: {}", socket_path_);
228 } else {
229 Logger()->debug(
230 "PipeTransport no existing socket file to remove: {}", socket_path_);
231 }
232 return {};
233}

References jsonrpc::error::RpcError::UnexpectedFromCode().

Here is the call graph for this function:

◆ SendMessage()

auto jsonrpc::transport::PipeTransport::SendMessage ( std::string message) -> asio::awaitable<std::expected<void, error::RpcError>>
overridevirtual

Implements jsonrpc::transport::Transport.

Definition at line 235 of file pipe_transport.cpp.

236 {
237 co_await asio::post(GetStrand(), asio::use_awaitable);
238
239 if (is_closed_) {
241 RpcErrorCode::kTransportError,
242 "Attempt to send message on closed transport");
243 }
244
245 if (!is_started_) {
247 RpcErrorCode::kTransportError,
248 "Transport not started before sending message");
249 }
250
251 if (!socket_.is_open()) {
253 RpcErrorCode::kTransportError, "Socket not open");
254 }
255
256 Logger()->debug("Queuing {} bytes to send to pipe", message.size());
257 send_queue_.push_back(std::move(message));
258
259 // If there's no active sending task, start one
260 if (!sending_.exchange(true)) {
261 asio::co_spawn(GetStrand(), SendMessageLoop(), asio::detached);
262 }
263
264 co_return Ok();
265}

References jsonrpc::error::RpcError::UnexpectedFromCode().

Referenced by jsonrpc::transport::FramedPipeTransport::SendMessage().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ Start()

auto jsonrpc::transport::PipeTransport::Start ( ) -> asio::awaitable<std::expected<void, error::RpcError>>
overridevirtual

Implements jsonrpc::transport::Transport.

Definition at line 40 of file pipe_transport.cpp.

41 {
42 Logger()->debug("PipeTransport starting");
43 co_await asio::post(GetStrand(), asio::use_awaitable);
44
45 if (is_started_) {
46 Logger()->debug("PipeTransport already started");
48 RpcErrorCode::kTransportError, "PipeTransport already started");
49 }
50
51 if (is_closed_) {
52 Logger()->error("PipeTransport cannot start a closed transport");
54 RpcErrorCode::kTransportError, "Cannot start a closed transport");
55 }
56
57 if (is_server_) {
58 // For server, bind and listen for connections
59 Logger()->debug("PipeTransport starting server at {}", socket_path_);
60 auto result = co_await BindAndListen();
61 if (!result) {
62 Logger()->error(
63 "PipeTransport server error starting at {}: {}", socket_path_,
64 result.error().Message());
65 co_return result;
66 }
67 } else {
68 // For client, connect to the server
69 Logger()->debug("PipeTransport connecting client to {}", socket_path_);
70 auto result = co_await Connect();
71 if (!result) {
72 Logger()->error(
73 "PipeTransport client error connecting to {}: {}", socket_path_,
74 result.error().Message());
75 co_return result;
76 }
77 }
78 Logger()->debug("PipeTransport client connected to {}", socket_path_);
79
80 // Set started flag before performing operations
81 is_started_ = true;
82 Logger()->debug("PipeTransport successfully started");
83 co_return Ok();
84}
auto Connect() -> asio::awaitable< std::expected< void, error::RpcError > >
auto BindAndListen() -> asio::awaitable< std::expected< void, error::RpcError > >

References jsonrpc::error::RpcError::UnexpectedFromCode().

Here is the call graph for this function:

The documentation for this class was generated from the following files: