Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge history db file into a common db file to avoid db file booming #518

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion prometheus_client/multiprocess.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import unicode_literals

from collections import defaultdict
from functools import wraps
import glob
import json
import os

from .process_lock import lock, unlock, LOCK_EX
from .metrics_core import Metric
from .mmap_dict import MmapedDict
from .mmap_dict import MmapedDict, mmap_key
from .samples import Sample
from .utils import floatToGoString

Expand All @@ -18,6 +20,21 @@
MP_METRIC_HELP = 'Multiprocess metric'


def require_metrics_lock(func):
@wraps(func)
def _wrap(*args, **kwargs):
path = os.environ.get('prometheus_multiproc_dir')
f = open(os.path.join(path, 'metrics.lock'), 'w')
try:
lock(f, LOCK_EX)
return func(*args, **kwargs)
finally:
unlock(f)
f.close()

return _wrap


class MultiProcessCollector(object):
"""Collector for files for multi-process mode."""

Expand All @@ -31,6 +48,7 @@ def __init__(self, registry, path=None):
registry.register(self)

@staticmethod
@require_metrics_lock
def merge(files, accumulate=True):
"""Merge metrics from given mmap files.

Expand Down Expand Up @@ -149,6 +167,7 @@ def collect(self):
return self.merge(files, accumulate=True)


@require_metrics_lock
def mark_process_dead(pid, path=None):
"""Do bookkeeping for when one process dies in a multi-process setup."""
if path is None:
Expand All @@ -157,3 +176,43 @@ def mark_process_dead(pid, path=None):
os.remove(f)
for f in glob.glob(os.path.join(path, 'gauge_liveall_{0}.db'.format(pid))):
os.remove(f)

# get associated db files with pid
files = glob.glob(os.path.join(path, '*_{0}.db'.format(pid)))
if not files:
return

# get merge file name
merge_files = []
for f in files:
file_prefix = os.path.basename(f).rsplit('_', 1)[0]
merge_file = os.path.join(path, '{0}_merge.db'.format(file_prefix))
if merge_file not in merge_files:
merge_files.append(merge_file)

# if not exist merge_file, create and init it
if not os.path.exists(merge_file):
MmapedDict(merge_file).close()

# do merge, here we use the same method to merge
metrics = MultiProcessCollector.merge(files + merge_files, accumulate=False)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brian-brazil here I use the same method to merge metrics

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this will cause deadlock: mark_process_dead is already holding lock and calling merge will try to acquire the same lock again.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe it will -- it looks like lock acquisition via flock is reentrant safe, but unlocking via flock isn't, so I think this will prematurely release the lock after the merge call here, at least on POSIX systems. I don't know what the locking behaviour on NT will be.

typ_metrics_dict = defaultdict(list)
for metric in metrics:
typ_metrics_dict[metric.type].append(metric)

# write data to correct merge_file
for merge_file in merge_files:
typ = os.path.basename(merge_file).split('_')[0]
d = MmapedDict(merge_file)
for metric in typ_metrics_dict[typ]:
for sample in metric.samples:
labels = values = []
if sample.labels:
labels, values = zip(*sample.labels.items())
key = mmap_key(metric.name, sample.name, labels, values)
d.write_value(key, sample.value)
d.close()

# remove the old db file
for f in files:
os.remove(f)
99 changes: 99 additions & 0 deletions prometheus_client/process_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#!/usr/bin/env python
# coding=utf-8

import os

__all__ = ('LOCK_EX', 'LOCK_SH', 'LOCK_NB', 'lock', 'unlock')


def _fd(f):
return f.fileno() if hasattr(f, 'fileno') else f


if os.name == 'nt':
import msvcrt
from ctypes import (sizeof, c_ulong, c_void_p, c_int64,
Structure, Union, POINTER, windll, byref)
from ctypes.wintypes import BOOL, DWORD, HANDLE

LOCK_SH = 0 # the default
LOCK_NB = 0x1 # LOCKFILE_FAIL_IMMEDIATELY
LOCK_EX = 0x2 # LOCKFILE_EXCLUSIVE_LOCK

if sizeof(c_ulong) != sizeof(c_void_p):
ULONG_PTR = c_int64
else:
ULONG_PTR = c_ulong
PVOID = c_void_p


class _OFFSET(Structure):
_fields_ = [
('Offset', DWORD),
('OffsetHigh', DWORD)]


class _OFFSET_UNION(Union):
_anonymous_ = ['_offset']
_fields_ = [
('_offset', _OFFSET),
('Pointer', PVOID)]


class OVERLAPPED(Structure):
_anonymous_ = ['_offset_union']
_fields_ = [
('Internal', ULONG_PTR),
('InternalHigh', ULONG_PTR),
('_offset_union', _OFFSET_UNION),
('hEvent', HANDLE)]


LPOVERLAPPED = POINTER(OVERLAPPED)

LockFileEx = windll.kernel32.LockFileEx
LockFileEx.restype = BOOL
LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED]
UnlockFileEx = windll.kernel32.UnlockFileEx
UnlockFileEx.restype = BOOL
UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED]


def lock(f, flags):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
ret = LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped))
return bool(ret)


def unlock(f):
hfile = msvcrt.get_osfhandle(_fd(f))
overlapped = OVERLAPPED()
ret = UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped))
return bool(ret)
else:
try:
import fcntl

LOCK_SH = fcntl.LOCK_SH # shared lock
LOCK_NB = fcntl.LOCK_NB # non-blocking
LOCK_EX = fcntl.LOCK_EX
except (ImportError, AttributeError):
LOCK_EX = LOCK_SH = LOCK_NB = 0


def lock(f, flags):
return False


def unlock(f):
return True
else:
def lock(f, flags):
ret = fcntl.flock(_fd(f), flags)
return ret == 0


def unlock(f):
ret = fcntl.flock(_fd(f), fcntl.LOCK_UN)
return ret == 0
13 changes: 12 additions & 1 deletion prometheus_client/values.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import unicode_literals

import os
import psutil
from threading import Lock

from .mmap_dict import mmap_key, MmapedDict
Expand Down Expand Up @@ -28,7 +29,17 @@ def get(self):
return self._value


def MultiProcessValue(process_identifier=os.getpid):
def default_process_identifier():
"""
2 process may have same identifier by using os.getpid only,
so here we add the process create time to identifier
"""
pid = os.getpid()
p = psutil.Process(pid)
return "{}_{}".format(pid, int(p.create_time()))


def MultiProcessValue(process_identifier=default_process_identifier):
"""Returns a MmapedValue class based on a process_identifier function.

The 'process_identifier' function MUST comply with this simple rule:
Expand Down