Lely core libraries 2.3.4
loop_driver.cpp
Go to the documentation of this file.
1
24#include "coapp.hpp"
25
26#if !LELY_NO_COAPP_MASTER && !LELY_NO_THREADS
27
29
30#include <atomic>
31#ifdef __MINGW32__
32#include <lely/libc/threads.h>
33#else
34#include <thread>
35#endif
36
37#include <cassert>
38
39namespace lely {
40
41namespace canopen {
42
45 explicit Impl_(LoopDriver* self, io::ContextBase ctx);
46 Impl_(const Impl_&) = delete;
47 Impl_& operator=(const Impl_&) = delete;
48 ~Impl_();
49
50 void Start();
51 void Shutdown();
52 void Join();
53
54 static const io_svc_vtbl svc_vtbl;
55
56 LoopDriver* self{nullptr};
57 io::ContextBase ctx{nullptr};
58 ::std::atomic_flag shutdown{false};
60#ifdef __MINGW32__
61 thrd_t thr;
62#else
63 ::std::thread thread;
64#endif
65 ::std::atomic_flag joined{false};
66};
67
68// clang-format off
69const io_svc_vtbl LoopDriver::Impl_::svc_vtbl = {
70 nullptr,
71 [](io_svc* svc) noexcept {
72 static_cast<LoopDriver::Impl_*>(svc)->Shutdown();
73 }
74};
75// clang-format on
76
77LoopDriver::LoopDriver(AsyncMaster& master, uint8_t id)
78 : BasicDriver(strand.get_inner_executor(), master, id),
79 impl_(new Impl_(this, master.GetContext())) {}
80
81LoopDriver::~LoopDriver() = default;
82
83void
85 impl_->Join();
86}
87
90 return impl_->stopped.get_future();
91}
92
93void
94LoopDriver::Wait(SdoFuture<void> f, ::std::error_code& ec) {
95 GetLoop().wait(f, ec);
96 try {
97 f.get().value();
98 } catch (const ::std::system_error& e) {
99 ec = e.code();
100 } catch (const ev::future_not_ready& e) {
101 ec = ::std::make_error_code(::std::errc::operation_canceled);
102 }
103}
104
105void
107 ::std::error_code ec;
108 USleep(usec, ec);
109 if (ec) throw ::std::system_error(ec, "USleep");
110}
111
112void
113LoopDriver::USleep(uint_least64_t usec, ::std::error_code& ec) {
114 io_tqueue_wait* wait = nullptr;
115 auto f = master.AsyncWait(::std::chrono::microseconds(usec), &wait);
116 assert(wait);
117 Wait(f, ec);
118 if (!f.is_ready()) master.CancelWait(*wait);
119}
120
121LoopDriver::Impl_::Impl_(LoopDriver* self_, io::ContextBase ctx_)
122 : io_svc IO_SVC_INIT(&svc_vtbl),
123 self(self_),
124 ctx(ctx_)
126 ,
127 thread(&Impl_::Start, this)
128#endif
129{
130#ifdef __MINGW32__
131 if (thrd_create(
132 &thr,
133 [](void* arg) noexcept {
134 static_cast<LoopDriver::Impl_*>(arg)->Start();
135 return 0;
136 },
137 this) != thrd_success)
138 util::throw_errc("thrd_create");
139#endif
140 ctx.insert(*this);
141}
142
143LoopDriver::Impl_::~Impl_() {
144 Join();
145 ctx.remove(*this);
146}
147
148void
149LoopDriver::Impl_::Start() {
150 auto& loop = self->GetLoop();
151 auto exec = self->GetExecutor();
152
153 // Start the event loop. Signal the existence of a fake task to prevent the
154 // loop for stopping early.
155 exec.on_task_init();
156 loop.run();
157 exec.on_task_fini();
158
159 // Deregister the driver to prevent the master from queueing new events. This
160 // also cancels any outstanding SDO requests.
161 self->master.Erase(*self);
162
163 // Finish remaining tasks, but do not block.
164 loop.restart();
165 loop.poll();
166
167 // Satisfy the promise to signal that the thread is about to terminate and it
168 // a call to the destructor will not block.
169 stopped.set(0);
170}
171
172void
173LoopDriver::Impl_::Shutdown() {
174 if (!shutdown.test_and_set()) {
175 // Stop receiving CANopen events.
176 self->master.Erase(*self);
177 // Stop the blocking run of the event loop.
178 self->GetLoop().stop();
179 }
180}
181
182void
183LoopDriver::Impl_::Join() {
184 if (!joined.test_and_set()) {
185 Shutdown();
186#ifdef __MINGW32__
187 thrd_join(thr, nullptr);
188#else
189 thread.join();
190#endif
191 }
192}
193
194} // namespace canopen
195
196} // namespace lely
197
198#endif // !LELY_NO_COAPP_MASTER && !LELY_NO_THREADS
A CANopen value.
Definition val.hpp:42
An asynchronous CANopen master.
Definition master.hpp:1957
The base class for drivers for remote CANopen nodes.
Definition driver.hpp:279
BasicMaster & master
A reference to the master with which this driver is registered.
Definition driver.hpp:1097
A CANopen driver running its own dedicated event loop in a separate thread.
void USleep(uint_least64_t usec)
Runs the event loop for usec microseconds.
T Wait(SdoFuture< T > f)
Waits for the specified future to become ready by running pending tasks on the dedicated event loop o...
void Join()
Stops the dedicated event loop of the driver and waits until the thread running the event loop finish...
ev::Future< void, void > AsyncStoppped() noexcept
Returns a future which becomes ready once the dedicated event loop of the driver is stopped and the t...
ev::Loop & GetLoop() noexcept
Returns a reference to the dedicated event loop of the driver.
~LoopDriver()
Stops the event loop and terminates the thread in which it was running before destroying the driver.
ev::Future< void, ::std::exception_ptr > AsyncWait(ev_exec_t *exec, const time_point &t, io_tqueue_wait **pwait=nullptr)
Submits an asynchronous wait operation and creates a future which becomes ready once the wait operati...
Definition node.cpp:185
bool CancelWait(io_tqueue_wait &wait) noexcept
Cancels the specified wait operation if it is pending.
Definition node.cpp:203
A future.
Definition future.hpp:384
::std::size_t wait(ev_future_t *future)
Definition loop.hpp:104
A promise.
Definition future.hpp:60
The exception thrown when retrieving the result of a future which is not ready or does not contain a ...
Definition future.hpp:45
A refence to an I/O context. This class is a wrapper around #io_ctx_t*.
Definition ctx.hpp:49
void remove(io_svc &svc) noexcept
Definition ctx.hpp:63
This is the internal header file of the C++ CANopen application library.
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition ctx.h:57
This header file is part of the C++ CANopen application library; it contains the declarations for the...
The virtual table of an I/O service.
Definition ctx.h:67
An I/O service.
Definition ctx.h:49
A wait operation suitable for use with a timer queue.
Definition tqueue.h:36
The internal implementation of lely::canopen::LoopDriver.
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int thrd_create(thrd_t *thr, thrd_start_t func, void *arg)
Creates a new thread executing func(arg).
int thrd_join(thrd_t thr, int *res)
Joins the thread identified by thr with the current thread by blocking until the other thread has ter...
@ thrd_success
Indicates that the requested operation succeeded.
Definition threads.h:121