JSON-RPC 2.0
JSON-RPC 2.0 Modern C++ Library
Loading...
Searching...
No Matches
socket_transport.cpp
Go to the documentation of this file.
2
3#include <asio.hpp>
4
5namespace jsonrpc::transport {
6
7using error::Ok;
8using error::RpcError;
10
12 asio::any_io_executor executor, std::string address, uint16_t port,
13 bool is_server, std::shared_ptr<spdlog::logger> logger)
14 : Transport(std::move(executor), logger),
15 socket_(GetExecutor()),
16 address_(std::move(address)),
17 port_(port),
18 is_server_(is_server),
19 read_buffer_() {
20}
21
23 if (!is_closed_) {
24 Logger()->debug("SocketTransport destructor triggering CloseNow()");
25 try {
26 CloseNow();
27 } catch (const std::exception &e) {
28 Logger()->error("SocketTransport destructor error: {}", e.what());
29 }
30 }
31}
32
34 -> asio::awaitable<std::expected<void, error::RpcError>> {
35 Logger()->debug("SocketTransport starting");
36 co_await asio::post(GetStrand(), asio::use_awaitable);
37
38 if (is_started_) {
39 Logger()->debug("SocketTransport already started");
41 RpcErrorCode::kTransportError, "SocketTransport already started");
42 }
43
44 if (is_closed_) {
45 Logger()->error("SocketTransport cannot start a closed transport");
47 RpcErrorCode::kTransportError, "Cannot start a closed transport");
48 }
49
50 std::expected<void, error::RpcError> result;
51
52 if (is_server_) {
53 Logger()->debug(
54 "SocketTransport starting server at {}:{}", address_, port_);
55 result = co_await BindAndListen();
56 if (!result) {
57 Logger()->error(
58 "SocketTransport error starting server: {}",
59 result.error().Message());
60 co_return result;
61 }
62 } else {
63 Logger()->debug(
64 "Connecting SocketTransport client to {}:{}", address_, port_);
65 result = co_await Connect();
66 if (!result) {
67 Logger()->error(
68 "SocketTransport error connecting client: {}",
69 result.error().Message());
70 co_return result;
71 }
72 Logger()->debug(
73 "SocketTransport client connected to {}:{}", address_, port_);
74 }
75
76 is_started_ = true;
77 Logger()->debug("SocketTransport successfully started");
78 co_return Ok();
79}
80
82 -> asio::awaitable<std::expected<void, error::RpcError>> {
83 Logger()->debug("SocketTransport closing");
84 co_await asio::post(GetStrand(), asio::use_awaitable);
85
86 if (is_closed_) {
87 Logger()->debug("SocketTransport already closed");
88 co_return std::expected<void, error::RpcError>{};
89 }
90
91 is_closed_ = true;
92 is_connected_ = false;
93
94 // Clear the message queue
95 send_queue_.clear();
96
97 Logger()->debug("SocketTransport closing");
98
99 // Cancel and close the socket safely
100 std::error_code ec;
101 if (socket_.is_open()) {
102 socket_.cancel(ec);
103 if (ec) {
104 Logger()->warn(
105 "SocketTransport error canceling socket: {}", ec.message());
106 }
107 socket_.close(ec);
108 if (ec) {
109 Logger()->warn("SocketTransport error closing socket: {}", ec.message());
110 }
111 }
112
113 // Clean up acceptor if this is a server
114 if (is_server_ && acceptor_) {
115 acceptor_->cancel(ec);
116 if (ec) {
117 Logger()->warn(
118 "SocketTransport error canceling acceptor: {}", ec.message());
119 }
120 acceptor_->close(ec);
121 if (ec) {
122 Logger()->warn(
123 "SocketTransport error closing acceptor: {}", ec.message());
124 }
125 }
126
127 Logger()->debug("SocketTransport closed");
128 co_return Ok();
129}
130
132 is_closed_ = true;
133 is_connected_ = false;
134
135 // Clear the message queue
136 send_queue_.clear();
137
138 auto try_close_socket = [&]() {
139 if (!socket_.is_open()) {
140 return;
141 }
142 Logger()->debug("SocketTransport closing socket synchronously");
143
144 std::error_code ec;
145 socket_.cancel();
146 socket_.close(ec);
147 if (ec) {
148 Logger()->warn("SocketTransport error closing socket: {}", ec.message());
149 }
150 };
151
152 auto try_close_acceptor = [&]() {
153 if (!is_server_ || !acceptor_ || !acceptor_->is_open()) {
154 return;
155 }
156 Logger()->debug("SocketTransport closing acceptor synchronously");
157
158 std::error_code ec;
159 acceptor_->cancel();
160 acceptor_->close(ec);
161 if (ec) {
162 Logger()->warn(
163 "SocketTransport error closing acceptor: {}", ec.message());
164 }
165 };
166
167 try {
168 try_close_socket();
169 try_close_acceptor();
170 } catch (const std::exception &e) {
171 Logger()->error("SocketTransport error during CloseNow(): {}", e.what());
172 }
173}
174
175auto SocketTransport::GetSocket() -> asio::ip::tcp::socket & {
176 return socket_;
177}
178
179auto SocketTransport::SendMessage(std::string message)
180 -> asio::awaitable<std::expected<void, error::RpcError>> {
181 co_await asio::post(GetStrand(), asio::use_awaitable);
182
183 if (is_closed_) {
185 RpcErrorCode::kTransportError,
186 "SendMessage() called on closed transport");
187 }
188
189 if (!is_started_) {
191 RpcErrorCode::kTransportError,
192 "Transport not started before sending message");
193 }
194
195 if (!socket_.is_open()) {
197 RpcErrorCode::kTransportError, "Socket not open in SendMessage()");
198 }
199
200 Logger()->debug("Queuing {} bytes to send to socket", message.size());
201 send_queue_.push_back(std::move(message));
202
203 // If there's no active sending task, start one
204 if (!sending_.exchange(true)) {
205 asio::co_spawn(GetStrand(), SendMessageLoop(), asio::detached);
206 }
207
208 co_return Ok();
209}
210
211auto SocketTransport::SendMessageLoop() -> asio::awaitable<void> {
212 while (!send_queue_.empty()) {
213 std::string message = std::move(send_queue_.front());
214 send_queue_.pop_front();
215
216 Logger()->debug("Sending {} bytes to socket", message.size());
217 std::size_t bytes_sent = 0;
218 const std::size_t chunk_size = 32 * 1024; // 32KB chunks to be safe
219
220 while (bytes_sent < message.size()) {
221 auto remaining = message.size() - bytes_sent;
222 auto current_chunk_size = std::min(remaining, chunk_size);
223
224 // Use string_view to avoid copying data
225 std::string_view chunk =
226 std::string_view(message).substr(bytes_sent, current_chunk_size);
227
228 // Write to the socket with error redirection
229 std::error_code ec;
230 auto chunk_sent = co_await asio::async_write(
231 socket_, asio::buffer(chunk),
232 asio::redirect_error(asio::use_awaitable, ec));
233
234 if (ec) {
235 Logger()->error(
236 "SocketTransport error sending message: {}", ec.message());
237 break; // Stop sending this message but continue with others
238 }
239
240 bytes_sent += chunk_sent;
241 Logger()->debug(
242 "Sent {} bytes to socket, total {}/{}", chunk_sent, bytes_sent,
243 message.size());
244 }
245 }
246
247 // Mark sending as complete
248 sending_ = false;
249}
250
252 -> asio::awaitable<std::expected<std::string, error::RpcError>> {
253 co_await asio::post(GetStrand(), asio::use_awaitable);
254
255 if (is_closed_) {
256 Logger()->warn(
257 "SocketTransport ReceiveMessage() called after transport was closed");
259 RpcErrorCode::kTransportError,
260 "ReceiveMessage() called after transport was closed");
261 }
262
263 if (!is_started_) {
265 RpcErrorCode::kTransportError,
266 "Transport not started before receiving message");
267 }
268
269 if (!socket_.is_open()) {
270 Logger()->warn("SocketTransport ReceiveMessage() socket not open");
272 RpcErrorCode::kTransportError, "Socket not open in ReceiveMessage()");
273 }
274
275 message_buffer_.clear();
276
277 std::error_code ec;
278 size_t bytes_read = co_await socket_.async_read_some(
279 asio::buffer(read_buffer_),
280 asio::redirect_error(asio::use_awaitable, ec));
281
282 if (ec) {
283 if (ec == asio::error::eof) {
284 Logger()->debug(
285 "SocketTransport EOF received, connection closed by peer");
286 is_connected_ = false;
288 RpcErrorCode::kTransportError, "Connection closed by peer");
289 } else if (ec == asio::error::operation_aborted) {
290 Logger()->debug("SocketTransport Read operation aborted");
292 RpcErrorCode::kTransportError, "Receive operation aborted");
293 } else {
294 Logger()->error(
295 "SocketTransport ASIO error in ReceiveMessage(): {}", ec.message());
297 RpcErrorCode::kTransportError, "Receive error: " + ec.message());
298 }
299 }
300
301 if (bytes_read == 0) {
303 RpcErrorCode::kTransportError, "Connection closed by peer (no data)");
304 }
305
306 message_buffer_.append(read_buffer_.data(), bytes_read);
307 Logger()->debug("SocketTransport received {} bytes", bytes_read);
308 co_return std::move(message_buffer_);
309}
310
311auto SocketTransport::Connect()
312 -> asio::awaitable<std::expected<void, error::RpcError>> {
313 Logger()->debug("SocketTransport connecting to {}:{}", address_, port_);
314
315 if (is_connected_) {
316 co_return Ok();
317 }
318
319 if (is_closed_) {
321 RpcErrorCode::kTransportError, "Cannot connect a closed transport");
322 }
323
324 // Close any existing socket
325 asio::error_code ec;
326 if (socket_.is_open()) {
327 socket_.close(ec);
328 if (ec) {
329 Logger()->warn(
330 "SocketTransport error closing socket before reconnect: {}",
331 ec.message());
332 }
333 }
334
335 // Create new socket if needed
336 if (!socket_.is_open()) {
337 socket_ = asio::ip::tcp::socket(GetExecutor());
338 }
339
340 // Resolve address
341 asio::ip::tcp::resolver resolver(GetExecutor());
342 auto endpoints = co_await resolver.async_resolve(
343 address_, std::to_string(port_),
344 asio::redirect_error(asio::use_awaitable, ec));
345 if (ec) {
346 Logger()->error(
347 "SocketTransport error resolving {}:{}: {}", address_, port_,
348 ec.message());
350 RpcErrorCode::kTransportError, "Resolve error: " + ec.message());
351 }
352
353 // Connect
354 co_await asio::async_connect(
355 socket_, endpoints, asio::redirect_error(asio::use_awaitable, ec));
356 if (ec) {
357 Logger()->error(
358 "SocketTransport error connecting to {}:{}: {}", address_, port_,
359 ec.message());
360 if (socket_.is_open()) {
361 socket_.close();
362 }
364 RpcErrorCode::kTransportError, "Connect error: " + ec.message());
365 }
366
367 is_connected_ = true;
368 Logger()->debug("SocketTransport connected to {}:{}", address_, port_);
369 co_return Ok();
370}
371
372auto SocketTransport::BindAndListen()
373 -> asio::awaitable<std::expected<void, error::RpcError>> {
374 Logger()->debug("SocketTransport binding to {}:{}", address_, port_);
375
376 asio::error_code ec;
377
378 // Create the endpoint
379 asio::ip::tcp::endpoint endpoint(asio::ip::tcp::v4(), port_);
380
381 // Resolve specific address if not 0.0.0.0 / ::
382 if (address_ != "0.0.0.0" && address_ != "::") {
383 asio::ip::tcp::resolver resolver(GetExecutor());
384 auto results = co_await resolver.async_resolve(
385 address_, std::to_string(port_),
386 asio::redirect_error(asio::use_awaitable, ec));
387 if (ec) {
388 Logger()->error(
389 "SocketTransport error resolving {}:{}: {}", address_, port_,
390 ec.message());
392 RpcErrorCode::kTransportError, "Resolve error: " + ec.message());
393 }
394 endpoint = *results.begin();
395 }
396
397 // Lazily construct the acceptor if not already created
398 if (!acceptor_) {
399 acceptor_ = std::make_unique<asio::ip::tcp::acceptor>(GetExecutor());
400 }
401
402 // Create and open acceptor
403 acceptor_->open(endpoint.protocol(), ec);
404 if (ec) {
405 Logger()->error("SocketTransport error opening acceptor: {}", ec.message());
407 RpcErrorCode::kTransportError, "Open error: " + ec.message());
408 }
409
410 acceptor_->set_option(asio::ip::tcp::acceptor::reuse_address(true), ec);
411 if (ec) {
412 Logger()->error(
413 "SocketTransport error setting reuse_address: {}", ec.message());
415 RpcErrorCode::kTransportError, "Set option error: " + ec.message());
416 }
417
418 acceptor_->bind(endpoint, ec);
419 if (ec) {
420 Logger()->error("SocketTransport error binding acceptor: {}", ec.message());
422 RpcErrorCode::kTransportError, "Bind error: " + ec.message());
423 }
424
425 acceptor_->listen(asio::socket_base::max_listen_connections, ec);
426 if (ec) {
427 Logger()->error("SocketTransport error listening: {}", ec.message());
429 RpcErrorCode::kTransportError, "Listen error: " + ec.message());
430 }
431
432 Logger()->debug("SocketTransport listening on {}:{}", address_, port_);
433
434 // Accept a connection
435 co_await acceptor_->async_accept(
436 socket_, asio::redirect_error(asio::use_awaitable, ec));
437 if (ec) {
438 Logger()->error(
439 "SocketTransport error accepting connection: {}", ec.message());
441 RpcErrorCode::kTransportError, "Accept error: " + ec.message());
442 }
443
444 is_connected_ = true;
445 Logger()->debug(
446 "SocketTransport accepted connection on {}:{}", address_, port_);
447
448 co_return Ok();
449}
450
451} // namespace jsonrpc::transport
static auto UnexpectedFromCode(RpcErrorCode code, std::string message="") -> std::unexpected< RpcError >
Definition error.hpp:101
SocketTransport(asio::any_io_executor executor, std::string address, uint16_t port, bool is_server, std::shared_ptr< spdlog::logger > logger=nullptr)
auto Close() -> asio::awaitable< std::expected< void, error::RpcError > > override
auto SendMessage(std::string message) -> asio::awaitable< std::expected< void, error::RpcError > > override
auto ReceiveMessage() -> asio::awaitable< std::expected< std::string, error::RpcError > > override
auto Start() -> asio::awaitable< std::expected< void, error::RpcError > > override
auto Logger() -> std::shared_ptr< spdlog::logger >
Definition transport.hpp:54
auto Ok() -> std::expected< void, RpcError >
Definition error.hpp:111