-
Notifications
You must be signed in to change notification settings - Fork 14
/
Stopwatch.py
145 lines (116 loc) · 4.42 KB
/
Stopwatch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import time
import socket
import struct
from typing import Optional
from functools import partial
def stopwatch(name: Optional[str] = None):
"""
Decorator to add stopwatch tick/tock with automatic naming
:param name: Optional name to be passed to the Stopwatch.tick call.
If empty, the functions __qualname__ will automatically be used
Example:
@stopwatch()
def function_to_time():
...
@stopwatch("custom_name_passed_to_tick_tock")
def other_function_to_time():
...
"""
def stopwatch_impl(func, _name_):
if _name_ is None:
_name_ = func.__qualname__
def wrapper(*args, **kwargs):
Stopwatch.tick(_name_)
result = func(*args, **kwargs)
Stopwatch.tock(_name_)
return result
return wrapper
return partial(stopwatch_impl, _name_=name)
class Stopwatch:
SEND_INTERVAL_MS = 10000
timings_ms = {}
tocks_us = {}
ticks_us = {}
signature = 0
last_send = current_send = int(time.perf_counter() * 1000000)
sockfd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
servaddr = ("127.0.0.1", 45454)
@staticmethod
def add_stopwatch_timing(name, duration):
if duration > 0:
Stopwatch.timings_ms[name] = float(duration) / 1000.0
@staticmethod
def set_custom_signature(new_signature):
Stopwatch.signature = new_signature
@staticmethod
def get_timings():
return Stopwatch.timings_ms
@staticmethod
def print_all():
for name, duration in Stopwatch.timings_ms.items():
print(f"{name}: {duration}ms")
print()
@staticmethod
def send_all():
if not (Stopwatch.timings_ms or Stopwatch.ticks_us or Stopwatch.tocks_us):
return # Nothing to send
current_time = int(time.perf_counter() * 1000000)
if current_time - Stopwatch.last_send > Stopwatch.SEND_INTERVAL_MS:
data = Stopwatch.serialise_timings()
Stopwatch.sockfd.sendto(data, Stopwatch.servaddr)
Stopwatch.last_send = current_time
@staticmethod
def get_current_system_time():
return int(time.perf_counter() * 1000000)
@staticmethod
def tick(name):
Stopwatch.ticks_us[name] = Stopwatch.get_current_system_time()
@staticmethod
def tock(name):
end = Stopwatch.get_current_system_time()
Stopwatch.tocks_us[name] = end
tick_us = Stopwatch.ticks_us.get(name)
if tick_us:
duration = (end - tick_us) / 1000.0
if duration > 0:
Stopwatch.timings_ms[name] = duration
@staticmethod
def serialise_timings():
# Compute the total size of the packet
packet_size_bytes = struct.calcsize("=Iq") # int32_t + uint64_t
for name in Stopwatch.timings_ms.keys():
packet_size_bytes += (
struct.calcsize("=B") + len(name) + 1 + struct.calcsize("=f")
)
for name in Stopwatch.ticks_us.keys():
packet_size_bytes += (
struct.calcsize("=B") + len(name) + 1 + struct.calcsize("=Q")
)
for name in Stopwatch.tocks_us.keys():
packet_size_bytes += (
struct.calcsize("=B") + len(name) + 1 + struct.calcsize("=Q")
)
data = bytearray(packet_size_bytes)
offset = 0
# Write the packet size
struct.pack_into("=I", data, offset, packet_size_bytes)
offset += struct.calcsize("=I")
# Write the signature
struct.pack_into("=q", data, offset, Stopwatch.signature)
offset += struct.calcsize("=q")
# Serialize the timings
def serialize_map(type_byte, name_value_map, value_format):
nonlocal offset
for name, value in name_value_map.items():
struct.pack_into("=B", data, offset, type_byte)
offset += struct.calcsize("=B")
struct.pack_into(f"={len(name)}s", data, offset, name.encode("ascii"))
offset += len(name)
data[offset] = 0 # Null terminator
offset += 1
struct.pack_into(value_format, data, offset, value)
offset += struct.calcsize(value_format)
serialize_map(0, Stopwatch.timings_ms, "=f")
serialize_map(1, Stopwatch.ticks_us, "=Q")
serialize_map(2, Stopwatch.tocks_us, "=Q")
return data