Lely core libraries 2.3.4
loop_driver.cpp
Go to the documentation of this file.
1
24#include "coapp.hpp"
25
26#if !LELY_NO_COAPP_MASTER && !LELY_NO_THREADS
27
29
30#include <atomic>
31#ifdef __MINGW32__
32#include <lely/libc/threads.h>
33#else
34#include <thread>
35#endif
36
37#include <cassert>
38
39namespace lely {
40
41namespace canopen {
42
45 explicit Impl_(LoopDriver* self, io::ContextBase ctx);
46 Impl_(const Impl_&) = delete;
47 Impl_& operator=(const Impl_&) = delete;
48 ~Impl_();
49
50 void Start();
51 void Shutdown();
52 void Join();
53
54 static const io_svc_vtbl svc_vtbl;
55
56 LoopDriver* self{nullptr};
57 io::ContextBase ctx{nullptr};
58 ::std::atomic_flag shutdown{false};
60#ifdef __MINGW32__
61 thrd_t thr;
62#else
63 ::std::thread thread;
64#endif
65 ::std::atomic_flag joined{false};
66};
67
68// clang-format off
69const io_svc_vtbl LoopDriver::Impl_::svc_vtbl = {
70 nullptr,
71 [](io_svc* svc) noexcept {
72 static_cast<LoopDriver::Impl_*>(svc)->Shutdown();
73 }
74};
75// clang-format on
76
77LoopDriver::LoopDriver(AsyncMaster& master, uint8_t id)
78 : BasicDriver(strand.get_inner_executor(), master, id),
79 impl_(new Impl_(this, master.GetContext())) {}
80
81LoopDriver::~LoopDriver() = default;
82
83void
85 impl_->Join();
86}
87
90 return impl_->stopped.get_future();
91}
92
93void
94LoopDriver::Wait(SdoFuture<void> f, ::std::error_code& ec) {
95 GetLoop().wait(f, ec);
96 try {
97 f.get().value();
98 } catch (const ::std::system_error& e) {
99 ec = e.code();
100 } catch (const ev::future_not_ready& e) {
101 ec = ::std::make_error_code(::std::errc::operation_canceled);
102 }
103}
104
105void
106LoopDriver::USleep(uint_least64_t usec) {
107 ::std::error_code ec;
108 USleep(usec, ec);
109 if (ec) throw ::std::system_error(ec, "USleep");
110}
111
112void
113LoopDriver::USleep(uint_least64_t usec, ::std::error_code& ec) {
114 io_tqueue_wait* wait = nullptr;
115 auto f = master.AsyncWait(::std::chrono::microseconds(usec), &wait);
116 assert(wait);
117 Wait(f, ec);
118 if (!f.is_ready()) master.CancelWait(*wait);
119}
120
121LoopDriver::Impl_::Impl_(LoopDriver* self_, io::ContextBase ctx_)
122 : io_svc IO_SVC_INIT(&svc_vtbl),
123 self(self_),
124 ctx(ctx_)
125#ifndef __MINGW32__
126 ,
127 thread(&Impl_::Start, this)
128#endif
129{
130#ifdef __MINGW32__
131 if (thrd_create(
132 &thr,
133 [](void* arg) noexcept {
134 static_cast<LoopDriver::Impl_*>(arg)->Start();
135 return 0;
136 },
137 this) != thrd_success)
138 util::throw_errc("thrd_create");
139#endif
140 ctx.insert(*this);
141}
142
143LoopDriver::Impl_::~Impl_() {
144 Join();
145 ctx.remove(*this);
146}
147
148void
149LoopDriver::Impl_::Start() {
150 auto& loop = self->GetLoop();
151 auto exec = self->GetExecutor();
152
153 // Start the event loop. Signal the existence of a fake task to prevent the
154 // loop for stopping early.
155 exec.on_task_init();
156 loop.run();
157 exec.on_task_fini();
158
159 // Deregister the driver to prevent the master from queueing new events. This
160 // also cancels any outstanding SDO requests.
161 self->master.Erase(*self);
162
163 // Finish remaining tasks, but do not block.
164 loop.restart();
165 loop.poll();
166
167 // Satisfy the promise to signal that the thread is about to terminate and it
168 // a call to the destructor will not block.
169 stopped.set(0);
170}
171
172void
173LoopDriver::Impl_::Shutdown() {
174 if (!shutdown.test_and_set()) {
175 // Stop receiving CANopen events.
176 self->master.Erase(*self);
177 // Stop the blocking run of the event loop.
178 self->GetLoop().stop();
179 }
180}
181
182void
183LoopDriver::Impl_::Join() {
184 if (!joined.test_and_set()) {
185 Shutdown();
186#ifdef __MINGW32__
187 thrd_join(thr, nullptr);
188#else
189 thread.join();
190#endif
191 }
192}
193
194} // namespace canopen
195
196} // namespace lely
197
198#endif // !LELY_NO_COAPP_MASTER && !LELY_NO_THREADS
An asynchronous CANopen master.
Definition: master.hpp:1957
The base class for drivers for remote CANopen nodes.
Definition: driver.hpp:279
BasicMaster & master
A reference to the master with which this driver is registered.
Definition: driver.hpp:1097
A CANopen driver running its own dedicated event loop in a separate thread.
Definition: loop_driver.hpp:51
void USleep(uint_least64_t usec)
Runs the event loop for usec microseconds.
T Wait(SdoFuture< T > f)
Waits for the specified future to become ready by running pending tasks on the dedicated event loop o...
void Join()
Stops the dedicated event loop of the driver and waits until the thread running the event loop finish...
Definition: loop_driver.cpp:84
ev::Future< void, void > AsyncStoppped() noexcept
Returns a future which becomes ready once the dedicated event loop of the driver is stopped and the t...
Definition: loop_driver.cpp:89
ev::Loop & GetLoop() noexcept
Returns a reference to the dedicated event loop of the driver.
Definition: loop_driver.hpp:74
~LoopDriver()
Stops the event loop and terminates the thread in which it was running before destroying the driver.
ev::Future< void, ::std::exception_ptr > AsyncWait(ev_exec_t *exec, const time_point &t, io_tqueue_wait **pwait=nullptr)
Submits an asynchronous wait operation and creates a future which becomes ready once the wait operati...
Definition: node.cpp:185
bool CancelWait(io_tqueue_wait &wait) noexcept
Cancels the specified wait operation if it is pending.
Definition: node.cpp:203
A future.
Definition: future.hpp:384
::std::size_t wait(ev_future_t *future)
Definition: loop.hpp:104
The exception thrown when retrieving the result of a future which is not ready or does not contain a ...
Definition: future.hpp:45
A refence to an I/O context. This class is a wrapper around #io_ctx_t*.
Definition: ctx.hpp:49
void remove(io_svc &svc) noexcept
Definition: ctx.hpp:63
This is the internal header file of the C++ CANopen application library.
#define IO_SVC_INIT(vptr)
The static initializer for io_svc.
Definition: ctx.h:57
This header file is part of the C++ CANopen application library; it contains the declarations for the...
::std::error_code make_error_code(SdoErrc e) noexcept
Creates an error code corresponding to an SDO abort code.
Definition: sdo_error.cpp:170
The virtual table of an I/O service.
Definition: ctx.h:67
An I/O service.
Definition: ctx.h:49
A wait operation suitable for use with a timer queue.
Definition: tqueue.h:36
The internal implementation of lely::canopen::LoopDriver.
Definition: loop_driver.cpp:44
This header file is part of the C11 and POSIX compatibility library; it includes <threads....
int thrd_create(thrd_t *thr, thrd_start_t func, void *arg)
Creates a new thread executing func(arg).
pthread_t thrd_t
A complete object type that holds an identifier for a thread.
Definition: threads.h:85
int thrd_join(thrd_t thr, int *res)
Joins the thread identified by thr with the current thread by blocking until the other thread has ter...
@ thrd_success
Indicates that the requested operation succeeded.
Definition: threads.h:121