12 asio::any_io_executor executor, std::string address, uint16_t port,
13 bool is_server, std::shared_ptr<spdlog::logger> logger)
15 socket_(GetExecutor()),
16 address_(std::move(address)),
18 is_server_(is_server),
24 Logger()->debug(
"SocketTransport destructor triggering CloseNow()");
27 }
catch (
const std::exception &e) {
28 Logger()->error(
"SocketTransport destructor error: {}", e.what());
34 -> asio::awaitable<std::expected<void, error::RpcError>> {
35 Logger()->debug(
"SocketTransport starting");
36 co_await asio::post(GetStrand(), asio::use_awaitable);
39 Logger()->debug(
"SocketTransport already started");
41 RpcErrorCode::kTransportError,
"SocketTransport already started");
45 Logger()->error(
"SocketTransport cannot start a closed transport");
47 RpcErrorCode::kTransportError,
"Cannot start a closed transport");
50 std::expected<void, error::RpcError> result;
54 "SocketTransport starting server at {}:{}", address_, port_);
55 result =
co_await BindAndListen();
58 "SocketTransport error starting server: {}",
59 result.error().Message());
64 "Connecting SocketTransport client to {}:{}", address_, port_);
65 result =
co_await Connect();
68 "SocketTransport error connecting client: {}",
69 result.error().Message());
73 "SocketTransport client connected to {}:{}", address_, port_);
77 Logger()->debug(
"SocketTransport successfully started");
82 -> asio::awaitable<std::expected<void, error::RpcError>> {
83 Logger()->debug(
"SocketTransport closing");
84 co_await asio::post(GetStrand(), asio::use_awaitable);
87 Logger()->debug(
"SocketTransport already closed");
88 co_return std::expected<void, error::RpcError>{};
92 is_connected_ =
false;
97 Logger()->debug(
"SocketTransport closing");
101 if (socket_.is_open()) {
105 "SocketTransport error canceling socket: {}", ec.message());
109 Logger()->warn(
"SocketTransport error closing socket: {}", ec.message());
114 if (is_server_ && acceptor_) {
115 acceptor_->cancel(ec);
118 "SocketTransport error canceling acceptor: {}", ec.message());
120 acceptor_->close(ec);
123 "SocketTransport error closing acceptor: {}", ec.message());
127 Logger()->debug(
"SocketTransport closed");
133 is_connected_ =
false;
138 auto try_close_socket = [&]() {
139 if (!socket_.is_open()) {
142 Logger()->debug(
"SocketTransport closing socket synchronously");
148 Logger()->warn(
"SocketTransport error closing socket: {}", ec.message());
152 auto try_close_acceptor = [&]() {
153 if (!is_server_ || !acceptor_ || !acceptor_->is_open()) {
156 Logger()->debug(
"SocketTransport closing acceptor synchronously");
160 acceptor_->close(ec);
163 "SocketTransport error closing acceptor: {}", ec.message());
169 try_close_acceptor();
170 }
catch (
const std::exception &e) {
171 Logger()->error(
"SocketTransport error during CloseNow(): {}", e.what());
175auto SocketTransport::GetSocket() -> asio::ip::tcp::socket & {
180 -> asio::awaitable<std::expected<void, error::RpcError>> {
181 co_await asio::post(GetStrand(), asio::use_awaitable);
185 RpcErrorCode::kTransportError,
186 "SendMessage() called on closed transport");
191 RpcErrorCode::kTransportError,
192 "Transport not started before sending message");
195 if (!socket_.is_open()) {
197 RpcErrorCode::kTransportError,
"Socket not open in SendMessage()");
200 Logger()->debug(
"Queuing {} bytes to send to socket", message.size());
201 send_queue_.push_back(std::move(message));
204 if (!sending_.exchange(
true)) {
205 asio::co_spawn(GetStrand(), SendMessageLoop(), asio::detached);
211auto SocketTransport::SendMessageLoop() -> asio::awaitable<void> {
212 while (!send_queue_.empty()) {
213 std::string message = std::move(send_queue_.front());
214 send_queue_.pop_front();
216 Logger()->debug(
"Sending {} bytes to socket", message.size());
217 std::size_t bytes_sent = 0;
218 const std::size_t chunk_size = 32 * 1024;
220 while (bytes_sent < message.size()) {
221 auto remaining = message.size() - bytes_sent;
222 auto current_chunk_size = std::min(remaining, chunk_size);
225 std::string_view chunk =
226 std::string_view(message).substr(bytes_sent, current_chunk_size);
230 auto chunk_sent =
co_await asio::async_write(
231 socket_, asio::buffer(chunk),
232 asio::redirect_error(asio::use_awaitable, ec));
236 "SocketTransport error sending message: {}", ec.message());
240 bytes_sent += chunk_sent;
242 "Sent {} bytes to socket, total {}/{}", chunk_sent, bytes_sent,
252 -> asio::awaitable<std::expected<std::string, error::RpcError>> {
253 co_await asio::post(GetStrand(), asio::use_awaitable);
257 "SocketTransport ReceiveMessage() called after transport was closed");
259 RpcErrorCode::kTransportError,
260 "ReceiveMessage() called after transport was closed");
265 RpcErrorCode::kTransportError,
266 "Transport not started before receiving message");
269 if (!socket_.is_open()) {
270 Logger()->warn(
"SocketTransport ReceiveMessage() socket not open");
272 RpcErrorCode::kTransportError,
"Socket not open in ReceiveMessage()");
275 message_buffer_.clear();
278 size_t bytes_read =
co_await socket_.async_read_some(
279 asio::buffer(read_buffer_),
280 asio::redirect_error(asio::use_awaitable, ec));
283 if (ec == asio::error::eof) {
285 "SocketTransport EOF received, connection closed by peer");
286 is_connected_ =
false;
288 RpcErrorCode::kTransportError,
"Connection closed by peer");
289 }
else if (ec == asio::error::operation_aborted) {
290 Logger()->debug(
"SocketTransport Read operation aborted");
292 RpcErrorCode::kTransportError,
"Receive operation aborted");
295 "SocketTransport ASIO error in ReceiveMessage(): {}", ec.message());
297 RpcErrorCode::kTransportError,
"Receive error: " + ec.message());
301 if (bytes_read == 0) {
303 RpcErrorCode::kTransportError,
"Connection closed by peer (no data)");
306 message_buffer_.append(read_buffer_.data(), bytes_read);
307 Logger()->debug(
"SocketTransport received {} bytes", bytes_read);
308 co_return std::move(message_buffer_);
311auto SocketTransport::Connect()
312 -> asio::awaitable<std::expected<void, error::RpcError>> {
313 Logger()->debug(
"SocketTransport connecting to {}:{}", address_, port_);
321 RpcErrorCode::kTransportError,
"Cannot connect a closed transport");
326 if (socket_.is_open()) {
330 "SocketTransport error closing socket before reconnect: {}",
336 if (!socket_.is_open()) {
337 socket_ = asio::ip::tcp::socket(GetExecutor());
341 asio::ip::tcp::resolver resolver(GetExecutor());
342 auto endpoints =
co_await resolver.async_resolve(
343 address_, std::to_string(port_),
344 asio::redirect_error(asio::use_awaitable, ec));
347 "SocketTransport error resolving {}:{}: {}", address_, port_,
350 RpcErrorCode::kTransportError,
"Resolve error: " + ec.message());
354 co_await asio::async_connect(
355 socket_, endpoints, asio::redirect_error(asio::use_awaitable, ec));
358 "SocketTransport error connecting to {}:{}: {}", address_, port_,
360 if (socket_.is_open()) {
364 RpcErrorCode::kTransportError,
"Connect error: " + ec.message());
367 is_connected_ =
true;
368 Logger()->debug(
"SocketTransport connected to {}:{}", address_, port_);
372auto SocketTransport::BindAndListen()
373 -> asio::awaitable<std::expected<void, error::RpcError>> {
374 Logger()->debug(
"SocketTransport binding to {}:{}", address_, port_);
379 asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port_);
382 if (address_ !=
"0.0.0.0" && address_ !=
"::") {
383 asio::ip::tcp::resolver resolver(GetExecutor());
384 auto results =
co_await resolver.async_resolve(
385 address_, std::to_string(port_),
386 asio::redirect_error(asio::use_awaitable, ec));
389 "SocketTransport error resolving {}:{}: {}", address_, port_,
392 RpcErrorCode::kTransportError,
"Resolve error: " + ec.message());
394 endpoint = *results.begin();
399 acceptor_ = std::make_unique<asio::ip::tcp::acceptor>(GetExecutor());
403 acceptor_->open(endpoint.protocol(), ec);
405 Logger()->error(
"SocketTransport error opening acceptor: {}", ec.message());
407 RpcErrorCode::kTransportError,
"Open error: " + ec.message());
410 acceptor_->set_option(asio::ip::tcp::acceptor::reuse_address(
true), ec);
413 "SocketTransport error setting reuse_address: {}", ec.message());
415 RpcErrorCode::kTransportError,
"Set option error: " + ec.message());
418 acceptor_->bind(endpoint, ec);
420 Logger()->error(
"SocketTransport error binding acceptor: {}", ec.message());
422 RpcErrorCode::kTransportError,
"Bind error: " + ec.message());
425 acceptor_->listen(asio::socket_base::max_listen_connections, ec);
427 Logger()->error(
"SocketTransport error listening: {}", ec.message());
429 RpcErrorCode::kTransportError,
"Listen error: " + ec.message());
432 Logger()->debug(
"SocketTransport listening on {}:{}", address_, port_);
435 co_await acceptor_->async_accept(
436 socket_, asio::redirect_error(asio::use_awaitable, ec));
439 "SocketTransport error accepting connection: {}", ec.message());
441 RpcErrorCode::kTransportError,
"Accept error: " + ec.message());
444 is_connected_ =
true;
446 "SocketTransport accepted connection on {}:{}", address_, port_);
static auto UnexpectedFromCode(RpcErrorCode code, std::string message="") -> std::unexpected< RpcError >
~SocketTransport() override
auto CloseNow() -> void override
SocketTransport(asio::any_io_executor executor, std::string address, uint16_t port, bool is_server, std::shared_ptr< spdlog::logger > logger=nullptr)
auto Close() -> asio::awaitable< std::expected< void, error::RpcError > > override
auto SendMessage(std::string message) -> asio::awaitable< std::expected< void, error::RpcError > > override
auto ReceiveMessage() -> asio::awaitable< std::expected< std::string, error::RpcError > > override
auto Start() -> asio::awaitable< std::expected< void, error::RpcError > > override
auto Logger() -> std::shared_ptr< spdlog::logger >
auto Ok() -> std::expected< void, RpcError >