Skip to content

Commit

Permalink
feat: read strings from modbus for descriptions fixes #7
Browse files Browse the repository at this point in the history
  • Loading branch information
tilsche committed Jul 28, 2023
1 parent 2c484a9 commit b1737cc
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 10 deletions.
22 changes: 21 additions & 1 deletion metricq_source_modbus/config_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ def metrics_not_empty(cls, v: dict[str, Metric]) -> dict[str, Metric]:
"""


class StringConfig(BaseModel, **_model_config):
address: NonNegativeInt
"""Register address of the value"""
size: PositiveInt
"""Length of the string in bytes (i.e. delta to the next address)"""

@field_validator("size")
def size_multiple_two(cls, v: int) -> int:
if v % 2 != 0:
raise ValueError("Size (in bytes) must be a multiple of two.")
return v


class Host(BaseModel, **_model_config):
hosts: str | list[str]
"""
Expand All @@ -73,12 +86,19 @@ class Host(BaseModel, **_model_config):
slave_id: int
"""Slave ID to query"""
description: str = ""
"""Description prefix for metadata of all included metrics"""
"""
Description prefix for metadata of all included metrics.
Can use placeholders from ``strings`` using ``$foo`` notation.
"""
descriptions: Optional[str | list[str]] = None
"""
An optional list of descriptions for each host, must match the expanded list of
``hosts`` and ``names``. Is used in addition to ``description``.
"""
strings: Optional[dict[str, StringConfig]] = None
"""
A set of strings to get from the device at initialization and use in the ``description``.
"""
groups: list[Group] = Field(..., min_items=1)
""" List of query groups. """

Expand Down
55 changes: 55 additions & 0 deletions metricq_source_modbus/read_strings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import asyncio
import struct
from string import Template
from typing import Optional

from async_modbus import AsyncClient, AsyncTCPClient # type: ignore
from metricq.logging import get_logger

from metricq_source_modbus.config_model import StringConfig

logger = get_logger()


async def _read_string(client: AsyncClient, slave_id: int, config: StringConfig) -> str:
assert config.size % 2 == 0
num_registers = config.size // 2
raw_values = await client.read_input_registers(1, 25500, num_registers)
assert len(raw_values) == num_registers
buffer = struct.pack(f">{len(raw_values)}H", *raw_values)
if (first_null := buffer.find(b"\x00")) != -1:
buffer = buffer[:first_null]
return buffer.decode("ASCII", errors="ignore")


class StringReplacer:
def __init__(self, mapping: dict[str, str]):
self._mapping = {key: self.sanitize(value) for key, value in mapping.items()}

@classmethod
def sanitize(self, value: str) -> str:
"""Replace Bacnet special characters with our beautiful MetricQ dots"""
return value.replace("'", ".").replace("`", ".").replace("´", ".").strip()

def __call__(self, description: str) -> str:
if not self._mapping:
return description
return Template(description).safe_substitute(self._mapping)


async def read_strings(
host: str, port: int, slave_id: int, strings: Optional[dict[str, StringConfig]]
) -> StringReplacer:
if not strings:
return StringReplacer({})
logger.info("Reading device strings from {}:{}", host, port)
reader, writer = await asyncio.open_connection(host, port)
client = AsyncTCPClient((reader, writer))
values = {
key: await _read_string(client, slave_id, config)
for key, config in strings.items()
}
writer.close()
await writer.wait_closed()
logger.info("Device strings for {}:{}: {}", host, port, values)
return StringReplacer(values)
36 changes: 27 additions & 9 deletions metricq_source_modbus/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@
import asyncio
import struct
from contextlib import suppress
from typing import Any, Iterable, Optional, Sequence, cast
from typing import Any, AsyncIterator, Optional, Sequence, cast

from async_modbus import AsyncClient, AsyncTCPClient # type: ignore
from hostlist import expand_hostlist # type: ignore
from metricq import JsonDict, MetadataDict, Source, Timedelta, Timestamp, rpc_handler
from metricq.logging import get_logger

from . import config_model
from .read_strings import StringReplacer, read_strings
from .version import __version__ # noqa: F401 # magic import for automatic version

logger = get_logger()
Expand Down Expand Up @@ -278,14 +279,15 @@ def __init__(
host: str,
name: str,
description: str,
replacer: StringReplacer,
config: config_model.Host,
):
self.source = source
self._host = host
self._port = config.port
self.metric_prefix = name
self.slave_id = config.slave_id
self.description = f"{config.description} {description}".strip()
self.description = replacer(f"{config.description} {description}".strip())

self._groups = [
MetricGroup(self, group_config) for group_config in config.groups
Expand All @@ -300,11 +302,11 @@ def _parse_hosts(hosts: str | list[str]) -> list[str]:
return hosts

@classmethod
def _create_from_host_config(
async def _create_from_host_config(
cls,
source: "ModbusSource",
host_config: config_model.Host,
) -> Iterable["Host"]:
) -> AsyncIterator["Host"]:
hosts = cls._parse_hosts(host_config.hosts)
names = cls._parse_hosts(host_config.names)
if len(hosts) != len(names):
Expand All @@ -317,21 +319,35 @@ def _create_from_host_config(
else:
descriptions = [""] * len(hosts)

for host, name, description in zip(hosts, names, descriptions):
replacers = await asyncio.gather(
*(
read_strings(
h, host_config.port, host_config.slave_id, host_config.strings
)
for h in hosts
)
)
assert len(hosts) == len(replacers)

for host, name, description, replacer in zip(
hosts, names, descriptions, replacers
):
yield Host(
source=source,
host=host,
name=name,
description=description,
replacer=replacer,
config=host_config,
)

@classmethod
def create_from_host_configs(
async def create_from_host_configs(
cls, source: "ModbusSource", host_configs: Sequence[config_model.Host]
) -> Iterable["Host"]:
) -> AsyncIterator["Host"]:
for host_config in host_configs:
yield from cls._create_from_host_config(source, host_config)
async for host in cls._create_from_host_config(source, host_config):
yield host

@property
def host(self) -> str:
Expand Down Expand Up @@ -391,7 +407,9 @@ async def _on_config(
if self.hosts is not None:
await self._stop_host_tasks()

self.hosts = list(Host.create_from_host_configs(self, config.hosts))
self.hosts = [
host async for host in Host.create_from_host_configs(self, config.hosts)
]

await self.declare_metrics(
{
Expand Down

0 comments on commit b1737cc

Please sign in to comment.