You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

515 lines
16 KiB

4 years ago
  1. #!/usr/bin/env python
  2. # -- Content-Encoding: UTF-8 --
  3. """
  4. Cached thread pool, inspired from Pelix/iPOPO Thread Pool
  5. :author: Thomas Calmant
  6. :copyright: Copyright 2018, Thomas Calmant
  7. :license: Apache License 2.0
  8. :version: 0.3.2
  9. ..
  10. Copyright 2018 Thomas Calmant
  11. Licensed under the Apache License, Version 2.0 (the "License");
  12. you may not use this file except in compliance with the License.
  13. You may obtain a copy of the License at
  14. http://www.apache.org/licenses/LICENSE-2.0
  15. Unless required by applicable law or agreed to in writing, software
  16. distributed under the License is distributed on an "AS IS" BASIS,
  17. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  18. See the License for the specific language governing permissions and
  19. limitations under the License.
  20. """
  21. # Standard library
  22. import logging
  23. import threading
  24. try:
  25. # Python 3
  26. # pylint: disable=F0401
  27. import queue
  28. except ImportError:
  29. # Python 2
  30. # pylint: disable=F0401
  31. import Queue as queue
  32. # ------------------------------------------------------------------------------
  33. # Module version
  34. __version_info__ = (0, 3, 2)
  35. __version__ = ".".join(str(x) for x in __version_info__)
  36. # Documentation strings format
  37. __docformat__ = "restructuredtext en"
  38. # ------------------------------------------------------------------------------
  39. class EventData(object):
  40. """
  41. A threading event with some associated data
  42. """
  43. def __init__(self):
  44. """
  45. Sets up the event
  46. """
  47. self.__event = threading.Event()
  48. self.__data = None
  49. self.__exception = None
  50. @property
  51. def data(self):
  52. """
  53. Returns the associated value
  54. """
  55. return self.__data
  56. @property
  57. def exception(self):
  58. """
  59. Returns the exception used to stop the wait() method
  60. """
  61. return self.__exception
  62. def clear(self):
  63. """
  64. Clears the event
  65. """
  66. self.__event.clear()
  67. self.__data = None
  68. self.__exception = None
  69. def is_set(self):
  70. """
  71. Checks if the event is set
  72. """
  73. return self.__event.is_set()
  74. def set(self, data=None):
  75. """
  76. Sets the event
  77. """
  78. self.__data = data
  79. self.__exception = None
  80. self.__event.set()
  81. def raise_exception(self, exception):
  82. """
  83. Raises an exception in wait()
  84. :param exception: An Exception object
  85. """
  86. self.__data = None
  87. self.__exception = exception
  88. self.__event.set()
  89. def wait(self, timeout=None):
  90. """
  91. Waits for the event or for the timeout
  92. :param timeout: Wait timeout (in seconds)
  93. :return: True if the event as been set, else False
  94. """
  95. # The 'or' part is for Python 2.6
  96. result = self.__event.wait(timeout)
  97. # pylint: disable=E0702
  98. # Pylint seems to miss the "is None" check below
  99. if self.__exception is None:
  100. return result
  101. else:
  102. raise self.__exception
  103. class FutureResult(object):
  104. """
  105. An object to wait for the result of a threaded execution
  106. """
  107. def __init__(self, logger=None):
  108. """
  109. Sets up the FutureResult object
  110. :param logger: The Logger to use in case of error (optional)
  111. """
  112. self._logger = logger or logging.getLogger(__name__)
  113. self._done_event = EventData()
  114. self.__callback = None
  115. self.__extra = None
  116. def __notify(self):
  117. """
  118. Notify the given callback about the result of the execution
  119. """
  120. if self.__callback is not None:
  121. try:
  122. self.__callback(self._done_event.data,
  123. self._done_event.exception,
  124. self.__extra)
  125. except Exception as ex:
  126. self._logger.exception("Error calling back method: %s", ex)
  127. def set_callback(self, method, extra=None):
  128. """
  129. Sets a callback method, called once the result has been computed or in
  130. case of exception.
  131. The callback method must have the following signature:
  132. ``callback(result, exception, extra)``.
  133. :param method: The method to call back in the end of the execution
  134. :param extra: Extra parameter to be given to the callback method
  135. """
  136. self.__callback = method
  137. self.__extra = extra
  138. if self._done_event.is_set():
  139. # The execution has already finished
  140. self.__notify()
  141. def execute(self, method, args, kwargs):
  142. """
  143. Execute the given method and stores its result.
  144. The result is considered "done" even if the method raises an exception
  145. :param method: The method to execute
  146. :param args: Method positional arguments
  147. :param kwargs: Method keyword arguments
  148. :raise Exception: The exception raised by the method
  149. """
  150. # Normalize arguments
  151. if args is None:
  152. args = []
  153. if kwargs is None:
  154. kwargs = {}
  155. try:
  156. # Call the method
  157. result = method(*args, **kwargs)
  158. except Exception as ex:
  159. # Something went wrong: propagate to the event and to the caller
  160. self._done_event.raise_exception(ex)
  161. raise
  162. else:
  163. # Store the result
  164. self._done_event.set(result)
  165. finally:
  166. # In any case: notify the call back (if any)
  167. self.__notify()
  168. def done(self):
  169. """
  170. Returns True if the job has finished, else False
  171. """
  172. return self._done_event.is_set()
  173. def result(self, timeout=None):
  174. """
  175. Waits up to timeout for the result the threaded job.
  176. Returns immediately the result if the job has already been done.
  177. :param timeout: The maximum time to wait for a result (in seconds)
  178. :raise OSError: The timeout raised before the job finished
  179. :raise Exception: The exception encountered during the call, if any
  180. """
  181. if self._done_event.wait(timeout):
  182. return self._done_event.data
  183. else:
  184. raise OSError("Timeout raised")
  185. # ------------------------------------------------------------------------------
  186. class ThreadPool(object):
  187. """
  188. Executes the tasks stored in a FIFO in a thread pool
  189. """
  190. def __init__(self, max_threads, min_threads=1, queue_size=0, timeout=60,
  191. logname=None):
  192. """
  193. Sets up the thread pool.
  194. Threads are kept alive 60 seconds (timeout argument).
  195. :param max_threads: Maximum size of the thread pool
  196. :param min_threads: Minimum size of the thread pool
  197. :param queue_size: Size of the task queue (0 for infinite)
  198. :param timeout: Queue timeout (in seconds, 60s by default)
  199. :param logname: Name of the logger
  200. :raise ValueError: Invalid number of threads
  201. """
  202. # Validate parameters
  203. try:
  204. max_threads = int(max_threads)
  205. if max_threads < 1:
  206. raise ValueError("Pool size must be greater than 0")
  207. except (TypeError, ValueError) as ex:
  208. raise ValueError("Invalid pool size: {0}".format(ex))
  209. try:
  210. min_threads = int(min_threads)
  211. if min_threads < 0:
  212. min_threads = 0
  213. elif min_threads > max_threads:
  214. min_threads = max_threads
  215. except (TypeError, ValueError) as ex:
  216. raise ValueError("Invalid pool size: {0}".format(ex))
  217. # The logger
  218. self._logger = logging.getLogger(logname or __name__)
  219. # The loop control event
  220. self._done_event = threading.Event()
  221. self._done_event.set()
  222. # The task queue
  223. try:
  224. queue_size = int(queue_size)
  225. except (TypeError, ValueError):
  226. # Not a valid integer
  227. queue_size = 0
  228. self._queue = queue.Queue(queue_size)
  229. self._timeout = timeout
  230. self.__lock = threading.RLock()
  231. # The thread pool
  232. self._min_threads = min_threads
  233. self._max_threads = max_threads
  234. self._threads = []
  235. # Thread count
  236. self._thread_id = 0
  237. # Current number of threads, active and alive,
  238. # and number of task waiting
  239. self.__nb_threads = 0
  240. self.__nb_active_threads = 0
  241. self.__nb_pending_task = 0
  242. def start(self):
  243. """
  244. Starts the thread pool. Does nothing if the pool is already started.
  245. """
  246. if not self._done_event.is_set():
  247. # Stop event not set: we're running
  248. return
  249. # Clear the stop event
  250. self._done_event.clear()
  251. # Compute the number of threads to start to handle pending tasks
  252. nb_pending_tasks = self._queue.qsize()
  253. if nb_pending_tasks > self._max_threads:
  254. nb_threads = self._max_threads
  255. nb_pending_tasks = self._max_threads
  256. elif nb_pending_tasks < self._min_threads:
  257. nb_threads = self._min_threads
  258. else:
  259. nb_threads = nb_pending_tasks
  260. # Create the threads
  261. for _ in range(nb_pending_tasks):
  262. self.__nb_pending_task += 1
  263. self.__start_thread()
  264. for _ in range(nb_threads-nb_pending_tasks):
  265. self.__start_thread()
  266. def __start_thread(self):
  267. """
  268. Starts a new thread, if possible
  269. """
  270. with self.__lock:
  271. if self.__nb_threads >= self._max_threads:
  272. # Can't create more threads
  273. return False
  274. if self._done_event.is_set():
  275. # We're stopped: do nothing
  276. return False
  277. # Prepare thread and start it
  278. name = "{0}-{1}".format(self._logger.name, self._thread_id)
  279. self._thread_id += 1
  280. thread = threading.Thread(target=self.__run, name=name)
  281. thread.daemon = True
  282. try:
  283. self.__nb_threads += 1
  284. thread.start()
  285. self._threads.append(thread)
  286. return True
  287. except (RuntimeError, OSError):
  288. self.__nb_threads -= 1
  289. return False
  290. def stop(self):
  291. """
  292. Stops the thread pool. Does nothing if the pool is already stopped.
  293. """
  294. if self._done_event.is_set():
  295. # Stop event set: we're stopped
  296. return
  297. # Set the stop event
  298. self._done_event.set()
  299. with self.__lock:
  300. # Add something in the queue (to unlock the join())
  301. try:
  302. for _ in self._threads:
  303. self._queue.put(self._done_event, True, self._timeout)
  304. except queue.Full:
  305. # There is already something in the queue
  306. pass
  307. # Copy the list of threads to wait for
  308. threads = self._threads[:]
  309. # Join threads outside the lock
  310. for thread in threads:
  311. while thread.is_alive():
  312. # Wait 3 seconds
  313. thread.join(3)
  314. if thread.is_alive():
  315. # Thread is still alive: something might be wrong
  316. self._logger.warning("Thread %s is still alive...",
  317. thread.name)
  318. # Clear storage
  319. del self._threads[:]
  320. self.clear()
  321. def enqueue(self, method, *args, **kwargs):
  322. """
  323. Queues a task in the pool
  324. :param method: Method to call
  325. :return: A FutureResult object, to get the result of the task
  326. :raise ValueError: Invalid method
  327. :raise Full: The task queue is full
  328. """
  329. if not hasattr(method, '__call__'):
  330. raise ValueError("{0} has no __call__ member."
  331. .format(method.__name__))
  332. # Prepare the future result object
  333. future = FutureResult(self._logger)
  334. # Use a lock, as we might be "resetting" the queue
  335. with self.__lock:
  336. # Add the task to the queue
  337. self._queue.put((method, args, kwargs, future), True,
  338. self._timeout)
  339. self.__nb_pending_task += 1
  340. if self.__nb_pending_task > self.__nb_threads:
  341. # All threads are taken: start a new one
  342. self.__start_thread()
  343. return future
  344. def clear(self):
  345. """
  346. Empties the current queue content.
  347. Returns once the queue have been emptied.
  348. """
  349. with self.__lock:
  350. # Empty the current queue
  351. try:
  352. while True:
  353. self._queue.get_nowait()
  354. self._queue.task_done()
  355. except queue.Empty:
  356. # Queue is now empty
  357. pass
  358. # Wait for the tasks currently executed
  359. self.join()
  360. def join(self, timeout=None):
  361. """
  362. Waits for all the tasks to be executed
  363. :param timeout: Maximum time to wait (in seconds)
  364. :return: True if the queue has been emptied, else False
  365. """
  366. if self._queue.empty():
  367. # Nothing to wait for...
  368. return True
  369. elif timeout is None:
  370. # Use the original join
  371. self._queue.join()
  372. return True
  373. else:
  374. # Wait for the condition
  375. with self._queue.all_tasks_done:
  376. self._queue.all_tasks_done.wait(timeout)
  377. return not bool(self._queue.unfinished_tasks)
  378. def __run(self):
  379. """
  380. The main loop
  381. """
  382. already_cleaned = False
  383. try:
  384. while not self._done_event.is_set():
  385. try:
  386. # Wait for an action (blocking)
  387. task = self._queue.get(True, self._timeout)
  388. if task is self._done_event:
  389. # Stop event in the queue: get out
  390. self._queue.task_done()
  391. return
  392. except queue.Empty:
  393. # Nothing to do yet
  394. pass
  395. else:
  396. with self.__lock:
  397. self.__nb_active_threads += 1
  398. # Extract elements
  399. method, args, kwargs, future = task
  400. try:
  401. # Call the method
  402. future.execute(method, args, kwargs)
  403. except Exception as ex:
  404. self._logger.exception("Error executing %s: %s",
  405. method.__name__, ex)
  406. finally:
  407. # Mark the action as executed
  408. self._queue.task_done()
  409. # Thread is not active anymore
  410. with self.__lock:
  411. self.__nb_pending_task -= 1
  412. self.__nb_active_threads -= 1
  413. # Clean up thread if necessary
  414. with self.__lock:
  415. extra_threads = self.__nb_threads - self.__nb_active_threads
  416. if self.__nb_threads > self._min_threads \
  417. and extra_threads > self._queue.qsize():
  418. # No more work for this thread
  419. # if there are more non active_thread than task
  420. # and we're above the minimum number of threads:
  421. # stop this one
  422. self.__nb_threads -= 1
  423. # To avoid a race condition: decrease the number of
  424. # threads here and mark it as already accounted for
  425. already_cleaned = True
  426. return
  427. finally:
  428. # Always clean up
  429. with self.__lock:
  430. # Thread stops: clean up references
  431. try:
  432. self._threads.remove(threading.current_thread())
  433. except ValueError:
  434. pass
  435. if not already_cleaned:
  436. self.__nb_threads -= 1