-
Notifications
You must be signed in to change notification settings - Fork 1
/
thread_pattern_02.py
149 lines (107 loc) · 4.08 KB
/
thread_pattern_02.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# -*- coding: utf-8 -*-
"""
Martin Guthrie
This code pattern demonstrates how to move GUI callback events to
their own thread via a class.
"""
import dearpygui.dearpygui as dpg
from threading import Thread, Lock, Event
import queue
import time
import traceback
import logging
logger = logging.getLogger()
FORMAT = "%(asctime)s: %(threadName)10s %(filename)22s %(funcName)25s %(levelname)-5.5s :%(lineno)4s: %(message)s"
formatter = logging.Formatter(FORMAT)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(formatter)
logger.addHandler(consoleHandler)
logger.setLevel(logging.INFO)
# Put this in a separate file
class WorkerBase(Thread):
EVENT_SHUTDOWN = "EVENT_SHUTDOWN"
def __init__(self, name="Worker"):
super().__init__()
self._lock = Lock()
self._q = queue.Queue()
self._stop_event = Event()
# Note: you can create your dpg widgets here, create the widgets
# and put the callbacks in the same class
self.name = name
self.start()
def enqueue(self, item_dict: dict):
logger.debug(item_dict)
self._q.put(item_dict)
def shutdown(self):
item_dict = {"type": self.EVENT_SHUTDOWN, "from": "shutdown"}
self.enqueue(item_dict)
self.join()
def is_stopped(self):
return self._stop_event.is_set()
def _event_shutdown(self):
self._stop_event.set()
logger.info(f"{self.name} shutdown")
def subc_events(self, item):
return False
def run(self):
logger.info(f"{self.name} run thread started")
while not self.is_stopped():
try:
item = self._q.get(block=True)
logger.debug(item)
with self._lock:
if item["type"] == self.EVENT_SHUTDOWN:
self._event_shutdown()
elif self.subc_events(item):
pass
else:
logger.error("Unknown event: {}".format(item["type"]))
except queue.Empty:
pass
except Exception as e:
logger.error("Error processing event {}, {}".format(e, item["type"]))
traceback.print_exc()
time.sleep(0) # allow other threads to run if any, in the case that the queue is full
logger.info(f"{self.name} run thread stopped")
class Worker(WorkerBase):
EVENT_CB_BUTTON1 = "EVENT_CB_BUTTON1"
def __init__(self, name="Worker"):
super().__init__()
# Note: you can create your dpg widgets here, create the widgets
# and put the callbacks in the same class
# ------------- Your specific code goes here --------------------
# For any GUI event, do the work on this class's thread, leaving
# the DPG thread to run as fast as possible
def _event_button1(self, item: dict):
# this is now running on its own thread
# this call is also protected by a lock
logger.info(item)
def cb_button1(self, sender, app_data, user_data):
# this is called on the client thread, in this case the DPG thread
logger.info(f"sender: {sender} {app_data} {user_data}")
item_dict = {"type": self.EVENT_CB_BUTTON1,
"cb": dict(s=sender, a=app_data, u=user_data),
"from": "cb_button1"}
self.enqueue(item_dict)
def subc_event_handler(self, item):
if item["type"] == self.EVENT_CB_BUTTON1:
self._event_button1(item)
return True
# elif item["type"] == self.EVENT_something:
# ... code ...
# return True
else:
return False
dpg.create_context()
dpg.create_viewport(height=200, width=200)
dpg.setup_dearpygui()
worker = Worker()
with dpg.window(label="Example", height=100, width=100):
dpg.add_text("Hello world")
# NOTE: the callback happens on the worker thread
dpg.add_button(tag="b1", label="Button1", callback=worker.cb_button1)
dpg.show_viewport()
dpg.start_dearpygui()
# shut thread down properly
worker.shutdown()
dpg.destroy_context()