7#include <asio/co_spawn.hpp>
8#include <asio/detached.hpp>
9#include <asio/use_awaitable.hpp>
11#include <spdlog/spdlog.h>
20 asio::any_io_executor executor, std::string socket_path,
bool is_server,
21 std::shared_ptr<spdlog::logger> logger)
23 socket_(GetExecutor()),
24 socket_path_(std::move(socket_path)),
25 is_server_(is_server),
31 Logger()->debug(
"PipeTransport destructor triggering CloseNow()");
34 }
catch (
const std::exception &e) {
35 Logger()->error(
"PipeTransport destructor error: {}", e.what());
41 -> asio::awaitable<std::expected<void, error::RpcError>> {
42 Logger()->debug(
"PipeTransport starting");
43 co_await asio::post(GetStrand(), asio::use_awaitable);
46 Logger()->debug(
"PipeTransport already started");
48 RpcErrorCode::kTransportError,
"PipeTransport already started");
52 Logger()->error(
"PipeTransport cannot start a closed transport");
54 RpcErrorCode::kTransportError,
"Cannot start a closed transport");
59 Logger()->debug(
"PipeTransport starting server at {}", socket_path_);
60 auto result =
co_await BindAndListen();
63 "PipeTransport server error starting at {}: {}", socket_path_,
64 result.error().Message());
69 Logger()->debug(
"PipeTransport connecting client to {}", socket_path_);
70 auto result =
co_await Connect();
73 "PipeTransport client error connecting to {}: {}", socket_path_,
74 result.error().Message());
78 Logger()->debug(
"PipeTransport client connected to {}", socket_path_);
82 Logger()->debug(
"PipeTransport successfully started");
87 -> asio::awaitable<std::expected<void, error::RpcError>> {
88 Logger()->debug(
"PipeTransport closing");
89 co_await asio::post(GetStrand(), asio::use_awaitable);
92 Logger()->debug(
"PipeTransport already closed");
97 is_connected_ =
false;
104 if (socket_.is_open()) {
107 Logger()->warn(
"PipeTransport error canceling socket: {}", ec.message());
111 Logger()->warn(
"PipeTransport error closing socket: {}", ec.message());
116 if (is_server_ && acceptor_) {
117 acceptor_->cancel(ec);
120 "PipeTransport error canceling acceptor: {}", ec.message());
122 acceptor_->close(ec);
124 Logger()->warn(
"PipeTransport error closing acceptor: {}", ec.message());
129 if (is_server_ && !socket_path_.empty()) {
130 auto result = RemoveExistingSocketFile();
133 "PipeTransport error removing socket file: {}",
134 result.error().Message());
138 Logger()->debug(
"PipeTransport closed");
144 is_connected_ =
false;
149 auto try_close_socket = [&]() {
150 if (!socket_.is_open()) {
153 Logger()->debug(
"PipeTransport closing socket synchronously");
159 Logger()->warn(
"PipeTransport error closing socket: {}", ec.message());
163 auto try_close_acceptor = [&]() {
164 if (!is_server_ || !acceptor_ || !acceptor_->is_open()) {
167 Logger()->debug(
"PipeTransport closing acceptor synchronously");
171 acceptor_->close(ec);
173 Logger()->warn(
"PipeTransport error closing acceptor: {}", ec.message());
177 auto try_remove_socket_file = [&]() {
178 if (!is_server_ || socket_path_.empty()) {
183 if (std::filesystem::exists(socket_path_, ec) && !ec) {
184 std::filesystem::remove(socket_path_, ec);
187 "PipeTransport error removing socket file: {}", ec.message());
189 Logger()->debug(
"PipeTransport removed socket file: {}", socket_path_);
193 "PipeTransport error checking socket file existence: {}",
200 try_close_acceptor();
201 try_remove_socket_file();
202 }
catch (
const std::exception &e) {
203 Logger()->error(
"PipeTransport error during CloseNow(): {}", e.what());
212 -> std::expected<void, error::RpcError> {
214 if (std::filesystem::exists(socket_path_, ec)) {
217 RpcErrorCode::kTransportError,
218 "Error checking if socket file exists: " + ec.message());
220 std::filesystem::remove(socket_path_, ec);
223 RpcErrorCode::kTransportError,
224 "Error removing socket file: " + ec.message());
227 "PipeTransport removed existing socket file: {}", socket_path_);
230 "PipeTransport no existing socket file to remove: {}", socket_path_);
236 -> asio::awaitable<std::expected<void, error::RpcError>> {
237 co_await asio::post(GetStrand(), asio::use_awaitable);
241 RpcErrorCode::kTransportError,
242 "Attempt to send message on closed transport");
247 RpcErrorCode::kTransportError,
248 "Transport not started before sending message");
251 if (!socket_.is_open()) {
253 RpcErrorCode::kTransportError,
"Socket not open");
256 Logger()->debug(
"Queuing {} bytes to send to pipe", message.size());
257 send_queue_.push_back(std::move(message));
260 if (!sending_.exchange(
true)) {
261 asio::co_spawn(GetStrand(), SendMessageLoop(), asio::detached);
267auto PipeTransport::SendMessageLoop() -> asio::awaitable<void> {
268 while (!send_queue_.empty()) {
269 std::string message = std::move(send_queue_.front());
270 send_queue_.pop_front();
272 Logger()->debug(
"Sending {} bytes to pipe", message.size());
273 std::size_t bytes_sent = 0;
274 const std::size_t chunk_size =
277 while (bytes_sent < message.size()) {
278 auto remaining = message.size() - bytes_sent;
279 auto current_chunk_size = std::min(remaining, chunk_size);
282 std::string_view chunk =
283 std::string_view(message).substr(bytes_sent, current_chunk_size);
287 auto chunk_sent =
co_await asio::async_write(
288 socket_, asio::buffer(chunk),
289 asio::redirect_error(asio::use_awaitable, ec));
293 "PipeTransport error sending message: {}", ec.message());
297 bytes_sent += chunk_sent;
299 "Sent {} bytes to pipe, total {}/{}", chunk_sent, bytes_sent,
309 -> asio::awaitable<std::expected<void, error::RpcError>> {
310 Logger()->debug(
"Flushing message queue");
312 co_await asio::post(GetStrand(), asio::use_awaitable);
313 if (send_queue_.empty() && !sending_) {
316 co_await asio::steady_timer(GetExecutor(), std::chrono::milliseconds(10))
317 .async_wait(asio::use_awaitable);
323 -> asio::awaitable<std::expected<std::string, error::RpcError>> {
324 co_await asio::post(GetStrand(), asio::use_awaitable);
328 "PipeTransport ReceiveMessage called after transport was closed");
330 RpcErrorCode::kTransportError,
331 "ReceiveMessage called after transport was closed");
336 RpcErrorCode::kTransportError,
337 "Transport not started before receiving message");
340 if (!socket_.is_open()) {
341 Logger()->warn(
"PipeTransport ReceiveMessage called on a closed socket");
343 RpcErrorCode::kTransportError,
344 "ReceiveMessage called on a closed socket");
347 message_buffer_.clear();
350 std::size_t bytes_read =
co_await socket_.async_read_some(
351 asio::buffer(read_buffer_),
352 asio::redirect_error(asio::use_awaitable, ec));
355 if (ec == asio::error::eof) {
356 Logger()->debug(
"PipeTransport connection closed by peer (EOF)");
357 is_connected_ =
false;
359 RpcErrorCode::kTransportError,
"Connection closed by peer");
360 }
else if (ec == asio::error::operation_aborted) {
361 Logger()->debug(
"PipeTransport Receive operation aborted");
363 RpcErrorCode::kTransportError,
"Receive aborted");
366 "PipeTransport error receiving message: {}", ec.message());
368 RpcErrorCode::kTransportError,
"Receive error: " + ec.message());
372 if (bytes_read == 0) {
374 RpcErrorCode::kTransportError,
"No data received");
377 message_buffer_.append(read_buffer_.data(), bytes_read);
378 auto log_message = message_buffer_;
379 if (log_message.size() > 70) {
380 log_message = log_message.substr(0, 70) +
"...";
382 std::ranges::replace(log_message,
'\n',
' ');
383 std::ranges::replace(log_message,
'\r',
' ');
384 Logger()->debug(
"PipeTransport received message: {}", log_message);
385 co_return std::move(message_buffer_);
389 -> asio::awaitable<std::expected<void, error::RpcError>> {
390 Logger()->debug(
"PipeTransport connecting to {}", socket_path_);
400 RpcErrorCode::kTransportError,
"Cannot connect a closed transport");
405 if (socket_.is_open()) {
409 "PipeTransport error closing socket before reconnect: {}",
415 socket_ = asio::local::stream_protocol::socket(GetExecutor());
418 asio::local::stream_protocol::endpoint endpoint(socket_path_);
419 co_await socket_.async_connect(
420 endpoint, asio::redirect_error(asio::use_awaitable, ec));
423 "PipeTransport error connecting to {}: {}", socket_path_, ec.message());
425 RpcErrorCode::kTransportError,
"Error connecting to: " + ec.message());
428 is_connected_ =
true;
429 Logger()->debug(
"PipeTransport connected to {}", socket_path_);
435 -> asio::awaitable<std::expected<void, error::RpcError>> {
436 Logger()->debug(
"PipeTransport binding to {}", socket_path_);
438 auto result = RemoveExistingSocketFile();
441 "PipeTransport error removing existing socket file: {}",
442 result.error().Message());
449 std::make_unique<asio::local::stream_protocol::acceptor>(GetExecutor());
453 asio::local::stream_protocol::endpoint endpoint(socket_path_);
457 acceptor_->open(endpoint.protocol(), ec);
459 Logger()->error(
"PipeTransport error opening acceptor: {}", ec.message());
461 RpcErrorCode::kTransportError,
462 "Error opening acceptor: " + ec.message());
464 acceptor_->bind(endpoint, ec);
466 Logger()->error(
"PipeTransport error binding acceptor: {}", ec.message());
468 RpcErrorCode::kTransportError,
469 "Error binding acceptor: " + ec.message());
471 acceptor_->listen(asio::socket_base::max_listen_connections, ec);
474 "PipeTransport error listening on acceptor: {}", ec.message());
476 RpcErrorCode::kTransportError,
477 "Error listening on acceptor: " + ec.message());
481 Logger()->debug(
"PipeTransport waiting for connection on {}", socket_path_);
482 co_await acceptor_->async_accept(
483 socket_, asio::redirect_error(asio::use_awaitable, ec));
486 "PipeTransport error accepting connection: {}", ec.message());
488 RpcErrorCode::kTransportError,
489 "Error accepting connection: " + ec.message());
491 is_connected_ =
true;
492 Logger()->debug(
"PipeTransport accepted connection on {}", socket_path_);
static auto UnexpectedFromCode(RpcErrorCode code, std::string message="") -> std::unexpected< RpcError >
auto Start() -> asio::awaitable< std::expected< void, error::RpcError > > override
auto RemoveExistingSocketFile() -> std::expected< void, error::RpcError >
PipeTransport(asio::any_io_executor executor, std::string socket_path, bool is_server=false, std::shared_ptr< spdlog::logger > logger=nullptr)
auto Connect() -> asio::awaitable< std::expected< void, error::RpcError > >
auto BindAndListen() -> asio::awaitable< std::expected< void, error::RpcError > >
auto GetSocket() -> asio::local::stream_protocol::socket &
auto Close() -> asio::awaitable< std::expected< void, error::RpcError > > override
auto SendMessage(std::string message) -> asio::awaitable< std::expected< void, error::RpcError > > override
auto Flush() -> asio::awaitable< std::expected< void, error::RpcError > >
~PipeTransport() override
auto ReceiveMessage() -> asio::awaitable< std::expected< std::string, error::RpcError > > override
auto Logger() -> std::shared_ptr< spdlog::logger >
auto Ok() -> std::expected< void, RpcError >