From 416842fa1b199b4bdac88d41d2d82abbb2572095 Mon Sep 17 00:00:00 2001 From: lixiaoyu Date: Thu, 5 Mar 2020 11:17:07 +0800 Subject: [PATCH] add file lock to avoid race condition --- prometheus_client/multiprocess.py | 23 ++++++- prometheus_client/process_lock.py | 99 +++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 2 deletions(-) create mode 100644 prometheus_client/process_lock.py diff --git a/prometheus_client/multiprocess.py b/prometheus_client/multiprocess.py index a325efbf..ff5edcb0 100644 --- a/prometheus_client/multiprocess.py +++ b/prometheus_client/multiprocess.py @@ -1,10 +1,12 @@ from __future__ import unicode_literals from collections import defaultdict +from functools import wraps import glob import json import os +from .process_lock import lock, unlock, LOCK_EX from .metrics_core import Metric from .mmap_dict import MmapedDict, mmap_key from .samples import Sample @@ -18,6 +20,21 @@ MP_METRIC_HELP = 'Multiprocess metric' +def require_metrics_lock(func): + @wraps(func) + def _wrap(*args, **kwargs): + path = os.environ.get('prometheus_multiproc_dir') + f = open(os.path.join(path, 'metrics.lock'), 'w') + try: + lock(f, LOCK_EX) + return func(*args, **kwargs) + finally: + unlock(f) + f.close() + + return _wrap + + class MultiProcessCollector(object): """Collector for files for multi-process mode.""" @@ -31,6 +48,7 @@ def __init__(self, registry, path=None): registry.register(self) @staticmethod + @require_metrics_lock def merge(files, accumulate=True): """Merge metrics from given mmap files. @@ -149,6 +167,7 @@ def collect(self): return self.merge(files, accumulate=True) +@require_metrics_lock def mark_process_dead(pid, path=None): """Do bookkeeping for when one process dies in a multi-process setup.""" if path is None: @@ -175,8 +194,8 @@ def mark_process_dead(pid, path=None): if not os.path.exists(merge_file): MmapedDict(merge_file).close() - # do merge - metrics = MultiProcessCollector(None).merge(files + merge_files, accumulate=False) + # do merge, here we use the same method to merge + metrics = MultiProcessCollector.merge(files + merge_files, accumulate=False) typ_metrics_dict = defaultdict(list) for metric in metrics: typ_metrics_dict[metric.type].append(metric) diff --git a/prometheus_client/process_lock.py b/prometheus_client/process_lock.py new file mode 100644 index 00000000..24ddbd0a --- /dev/null +++ b/prometheus_client/process_lock.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python +# coding=utf-8 + +import os + +__all__ = ('LOCK_EX', 'LOCK_SH', 'LOCK_NB', 'lock', 'unlock') + + +def _fd(f): + return f.fileno() if hasattr(f, 'fileno') else f + + +if os.name == 'nt': + import msvcrt + from ctypes import (sizeof, c_ulong, c_void_p, c_int64, + Structure, Union, POINTER, windll, byref) + from ctypes.wintypes import BOOL, DWORD, HANDLE + + LOCK_SH = 0 # the default + LOCK_NB = 0x1 # LOCKFILE_FAIL_IMMEDIATELY + LOCK_EX = 0x2 # LOCKFILE_EXCLUSIVE_LOCK + + if sizeof(c_ulong) != sizeof(c_void_p): + ULONG_PTR = c_int64 + else: + ULONG_PTR = c_ulong + PVOID = c_void_p + + + class _OFFSET(Structure): + _fields_ = [ + ('Offset', DWORD), + ('OffsetHigh', DWORD)] + + + class _OFFSET_UNION(Union): + _anonymous_ = ['_offset'] + _fields_ = [ + ('_offset', _OFFSET), + ('Pointer', PVOID)] + + + class OVERLAPPED(Structure): + _anonymous_ = ['_offset_union'] + _fields_ = [ + ('Internal', ULONG_PTR), + ('InternalHigh', ULONG_PTR), + ('_offset_union', _OFFSET_UNION), + ('hEvent', HANDLE)] + + + LPOVERLAPPED = POINTER(OVERLAPPED) + + LockFileEx = windll.kernel32.LockFileEx + LockFileEx.restype = BOOL + LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED] + UnlockFileEx = windll.kernel32.UnlockFileEx + UnlockFileEx.restype = BOOL + UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED] + + + def lock(f, flags): + hfile = msvcrt.get_osfhandle(_fd(f)) + overlapped = OVERLAPPED() + ret = LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped)) + return bool(ret) + + + def unlock(f): + hfile = msvcrt.get_osfhandle(_fd(f)) + overlapped = OVERLAPPED() + ret = UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped)) + return bool(ret) +else: + try: + import fcntl + + LOCK_SH = fcntl.LOCK_SH # shared lock + LOCK_NB = fcntl.LOCK_NB # non-blocking + LOCK_EX = fcntl.LOCK_EX + except (ImportError, AttributeError): + LOCK_EX = LOCK_SH = LOCK_NB = 0 + + + def lock(f, flags): + return False + + + def unlock(f): + return True + else: + def lock(f, flags): + ret = fcntl.flock(_fd(f), flags) + return ret == 0 + + + def unlock(f): + ret = fcntl.flock(_fd(f), fcntl.LOCK_UN) + return ret == 0