-
Notifications
You must be signed in to change notification settings - Fork 1
/
common.py
74 lines (65 loc) · 2.59 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Common utilities."""
import collections
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
import numpy as np
import svgwrite
import tflite_runtime.interpreter as tflite
import time
EDGETPU_SHARED_LIB = 'libedgetpu.so.1'
def make_interpreter(model_file):
model_file, *device = model_file.split('@')
return tflite.Interpreter(
model_path=model_file,
experimental_delegates=[
tflite.load_delegate(EDGETPU_SHARED_LIB,
{'device': device[0]} if device else {})
])
def input_image_size(interpreter):
"""Returns input size as (width, height, channels) tuple."""
_, height, width, channels = interpreter.get_input_details()[0]['shape']
return width, height, channels
def input_tensor(interpreter):
"""Returns input tensor view as numpy array of shape (height, width, channels)."""
tensor_index = interpreter.get_input_details()[0]['index']
return interpreter.tensor(tensor_index)()[0]
def set_input(interpreter, buf):
"""Copies data to input tensor."""
result, mapinfo = buf.map(Gst.MapFlags.READ)
if result:
np_buffer = np.reshape(np.frombuffer(mapinfo.data, dtype=np.uint8), input_image_size(interpreter))
input_tensor(interpreter)[:, :] = np_buffer
buf.unmap(mapinfo)
def output_tensor(interpreter, i):
"""Returns dequantized output tensor if quantized before."""
output_details = interpreter.get_output_details()[i]
output_data = np.squeeze(interpreter.tensor(output_details['index'])())
if 'quantization' not in output_details:
return output_data
scale, zero_point = output_details['quantization']
if scale == 0:
return output_data - zero_point
return scale * (output_data - zero_point)
def avg_fps_counter(window_size):
window = collections.deque(maxlen=window_size)
prev = time.monotonic()
yield 0.0 # First fps value.
while True:
curr = time.monotonic()
window.append(curr - prev)
prev = curr
yield len(window) / sum(window)