Add instance name into metrics labels (#6249)

* Add labeled histogram
* Add label to backend, frontend and background_errors metrics
This commit is contained in:
Fantix King 2023-10-11 05:03:31 +09:00 committed by GitHub
parent 22a7f4fcd8
commit 6b27b85a2f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 303 additions and 64 deletions

View file

@ -190,6 +190,22 @@ class Registry:
self._add_metric(hist)
return hist
def new_labeled_histogram(
self,
name: str,
desc: str,
/,
*,
unit: Unit | None = None,
buckets: list[float] | None = None,
labels: tuple[str],
) -> LabeledHistogram:
hist = LabeledHistogram(
self, name, desc, unit, buckets=buckets, labels=labels
)
self._add_metric(hist)
return hist
def generate(self) -> str:
buffer: list[str] = []
for metric in self._metrics:
@ -457,13 +473,11 @@ class LabeledGauge(BaseLabeledCounter):
self._metric_created[labels] = self._registry.now()
class Histogram(BaseMetric):
class BaseHistogram(BaseMetric):
_type = 'histogram'
_buckets: list[float]
_values: list[float]
_sum: float
# Default buckets that many standard prometheus client libraries use.
DEFAULT_BUCKETS = [
@ -490,8 +504,21 @@ class Histogram(BaseMetric):
super().__init__(*args)
self._sum = 0.0
self._buckets = buckets
class Histogram(BaseHistogram):
_values: list[float]
_sum: float
def __init__(
self,
*args: typing.Any,
buckets: list[float] | None = None
) -> None:
super().__init__(*args, buckets=buckets)
self._sum = 0.0
self._values = [0.0] * len(self._buckets)
def observe(self, value: float) -> None:
@ -527,6 +554,80 @@ class Histogram(BaseMetric):
buffer.append(f'{self._name}_created {float(self._created)}')
class LabeledHistogram(BaseHistogram):
_labels: tuple[str, ...]
_metric_values: dict[tuple[str, ...], list[float | list[float]]]
_metric_created: dict[tuple[str, ...], float]
def __init__(
self,
*args: typing.Any,
buckets: list[float] | None = None,
labels: tuple[str, ...],
) -> None:
super().__init__(*args, buckets=buckets)
self._labels = labels
self._metric_values = {}
self._metric_created = {}
def observe(self, value: float, *labels: str) -> None:
self._validate_label_values(self._labels, labels)
try:
metric = self._metric_values[labels]
except KeyError:
metric = [0.0, [0.0] * len(self._buckets)]
self._metric_values[labels] = metric
self._metric_created[labels] = self._registry.now()
idx = bisect.bisect_left(self._buckets, value)
metric[1][idx] += 1.0 # type: ignore
metric[0] += value # type: ignore
def _generate(self, buffer: list[str]) -> None:
desc = _format_desc(self._desc)
buffer.append(f'# HELP {self._name} {desc}')
buffer.append(f'# TYPE {self._name} histogram')
for labels, values in self._metric_values.items():
fmt_label = ','.join(
f'{label}="{_format_label_val(label_val)}"'
for label, label_val in zip(self._labels, labels)
)
accum = 0.0
for buck, val in zip(self._buckets, values[1]): # type: ignore
accum += val
if math.isinf(buck):
if buck > 0:
buckf = '+Inf'
else:
buckf = '-Inf'
else:
buckf = str(buck)
buffer.append(
f'{self._name}_bucket{{le="{buckf}",{fmt_label}}} {accum}'
)
buffer.append(f'{self._name}_count{{{fmt_label}}} {accum}')
buffer.append(f'{self._name}_sum{{{fmt_label}}} {values[0]}')
if self._metric_values:
buffer.append(f'# HELP {self._name}_created {desc}')
buffer.append(f'# TYPE {self._name}_created gauge')
for labels, value in self._metric_created.items():
fmt_label = ','.join(
f'{label}="{_format_label_val(label_val)}"'
for label, label_val in zip(self._labels, labels)
)
buffer.append(
f'{self._name}_created{{{fmt_label}}} {float(value)}'
)
@functools.lru_cache(maxsize=1024)
def _format_desc(desc: str) -> str:
return desc.replace('\\', r'\\').replace('\n', r'\n')

View file

@ -32,52 +32,60 @@ current_compiler_processes = registry.new_gauge(
'Current number of active compiler processes.'
)
total_backend_connections = registry.new_counter(
total_backend_connections = registry.new_labeled_counter(
'backend_connections_total',
'Total number of backend connections established.'
'Total number of backend connections established.',
labels=('tenant',),
)
current_backend_connections = registry.new_gauge(
current_backend_connections = registry.new_labeled_gauge(
'backend_connections_current',
'Current number of active backend connections.'
'Current number of active backend connections.',
labels=('tenant',),
)
backend_connection_establishment_errors = registry.new_counter(
backend_connection_establishment_errors = registry.new_labeled_counter(
'backend_connection_establishment_errors_total',
'Number of times the server could not establish a backend connection.'
'Number of times the server could not establish a backend connection.',
labels=('tenant',),
)
backend_connection_establishment_latency = registry.new_histogram(
backend_connection_establishment_latency = registry.new_labeled_histogram(
'backend_connection_establishment_latency',
'Time it takes to establish a backend connection.',
unit=prom.Unit.SECONDS,
labels=('tenant',),
)
backend_connection_aborted = registry.new_labeled_counter(
'backend_connections_aborted_total',
'Number of aborted backend connections.',
labels=('pgcode',)
labels=('tenant', 'pgcode')
)
backend_query_duration = registry.new_histogram(
backend_query_duration = registry.new_labeled_histogram(
'backend_query_duration',
'Time it takes to run a query on a backend connection.',
unit=prom.Unit.SECONDS,
labels=('tenant',),
)
total_client_connections = registry.new_counter(
total_client_connections = registry.new_labeled_counter(
'client_connections_total',
'Total number of clients.'
'Total number of clients.',
labels=('tenant',),
)
current_client_connections = registry.new_gauge(
current_client_connections = registry.new_labeled_gauge(
'client_connections_current',
'Current number of active clients.'
'Current number of active clients.',
labels=('tenant',),
)
idle_client_connections = registry.new_counter(
idle_client_connections = registry.new_labeled_counter(
'client_connections_idle_total',
'Total number of forcefully closed idle client connections.'
'Total number of forcefully closed idle client connections.',
labels=('tenant',),
)
edgeql_query_compilations = registry.new_labeled_counter(
@ -95,7 +103,7 @@ edgeql_query_compilation_duration = registry.new_histogram(
background_errors = registry.new_labeled_counter(
'background_errors_total',
'Number of unhandled errors in background server routines.',
labels=('source',)
labels=('tenant', 'source')
)
ha_events_total = registry.new_labeled_counter(

View file

@ -185,4 +185,5 @@ cdef class PGConnection:
cdef _rewrite_sql_error_response(self, PGMessage action, WriteBuffer buf)
cdef inline str get_tenant_label(self)
cpdef set_stmt_cache_size(self, int maxsize)

View file

@ -741,6 +741,12 @@ cdef class PGConnection:
# serialization conflicts.
raise error
cdef inline str get_tenant_label(self):
if self.tenant is None:
return "system"
else:
return self.tenant.get_instance_name()
cdef bint before_prepare(
self,
bytes stmt_name,
@ -894,7 +900,9 @@ cdef class PGConnection:
while self.waiting_for_sync:
await self.wait_for_sync()
finally:
metrics.backend_query_duration.observe(time.monotonic() - started_at)
metrics.backend_query_duration.observe(
time.monotonic() - started_at, self.get_tenant_label()
)
await self.after_command()
cdef send_query_unit_group(
@ -1394,7 +1402,9 @@ cdef class PGConnection:
dbver,
)
finally:
metrics.backend_query_duration.observe(time.monotonic() - started_at)
metrics.backend_query_duration.observe(
time.monotonic() - started_at, self.get_tenant_label()
)
await self.after_command()
async def sql_fetch(
@ -1569,7 +1579,9 @@ cdef class PGConnection:
try:
return await self._sql_execute(sql_string)
finally:
metrics.backend_query_duration.observe(time.monotonic() - started_at)
metrics.backend_query_duration.observe(
time.monotonic() - started_at, self.get_tenant_label()
)
await self.after_command()
async def sql_apply_state(
@ -2820,7 +2832,9 @@ cdef class PGConnection:
self.aborted_with_error = er_cls(fields=fields)
pgcode = fields['C']
metrics.backend_connection_aborted.inc(1.0, pgcode)
metrics.backend_connection_aborted.inc(
1.0, self.get_tenant_label(), pgcode
)
if pgcode in POSTGRES_SHUTDOWN_ERR_CODES:
pgreason = POSTGRES_SHUTDOWN_ERR_CODES[pgcode]

View file

@ -33,7 +33,7 @@ cdef class FrontendConnection(AbstractFrontendConnection):
cdef:
str _id
object server
object tenant
readonly object tenant
object loop
str dbname
str username

View file

@ -266,6 +266,12 @@ cdef class FrontendConnection(AbstractFrontendConnection):
cdef _main_task_stopped_normally(self):
pass
def get_tenant_label(self):
if self.tenant is None:
return "unknown"
else:
return self.tenant.get_instance_name()
def connection_made(self, transport):
if self.tenant is None:
self._transport = transport

View file

@ -272,11 +272,15 @@ class BaseServer:
def on_binary_client_connected(self, conn):
self._binary_conns[conn] = True
metrics.current_client_connections.inc()
metrics.current_client_connections.inc(
1.0, conn.get_tenant_label()
)
def on_binary_client_authed(self, conn):
self._report_connections(event='opened')
metrics.total_client_connections.inc()
metrics.total_client_connections.inc(
1.0, conn.get_tenant_label()
)
def on_binary_client_after_idling(self, conn):
try:
@ -285,12 +289,16 @@ class BaseServer:
# Shouldn't happen, but just in case some weird async twist
# gets us here we don't want to crash the connection with
# this error.
metrics.background_errors.inc(1.0, 'client_after_idling')
metrics.background_errors.inc(
1.0, conn.get_tenant_label(), 'client_after_idling'
)
def on_binary_client_disconnected(self, conn):
self._binary_conns.pop(conn, None)
self._report_connections(event="closed")
metrics.current_client_connections.dec()
metrics.current_client_connections.dec(
1.0, conn.get_tenant_label()
)
self.maybe_auto_shutdown()
def maybe_auto_shutdown(self):
@ -401,7 +409,9 @@ class BaseServer:
for conn in self._binary_conns:
try:
if conn.is_idle(expiry_time):
metrics.idle_client_connections.inc()
metrics.idle_client_connections.inc(
1.0, conn.get_tenant_label()
)
conn.close_for_idling()
elif conn.is_alive():
# We are sorting connections in
@ -412,10 +422,14 @@ class BaseServer:
# connections.
break
except Exception:
metrics.background_errors.inc(1.0, 'close_for_idling')
metrics.background_errors.inc(
1.0, conn.get_tenant_label(), 'close_for_idling'
)
conn.abort()
except Exception:
metrics.background_errors.inc(1.0, 'idle_clients_collector')
metrics.background_errors.inc(
1.0, 'system', 'idle_clients_collector'
)
raise
def _get_backend_runtime_params(self) -> pgparams.BackendRuntimeParams:
@ -1424,7 +1438,9 @@ class Server(BaseServer):
self._tenant.schedule_reported_config_if_needed(setting_name)
except Exception:
metrics.background_errors.inc(1.0, 'on_system_config_set')
metrics.background_errors.inc(
1.0, self._tenant.get_instance_name(), 'on_system_config_set'
)
raise
async def _on_system_config_reset(self, setting_name):
@ -1453,7 +1469,9 @@ class Server(BaseServer):
self._tenant.schedule_reported_config_if_needed(setting_name)
except Exception:
metrics.background_errors.inc(1.0, 'on_system_config_reset')
metrics.background_errors.inc(
1.0, self._tenant.get_instance_name(), 'on_system_config_reset'
)
raise
async def _after_system_config_add(self, setting_name, value):
@ -1461,7 +1479,11 @@ class Server(BaseServer):
if setting_name == 'auth':
self._tenant.populate_sys_auth()
except Exception:
metrics.background_errors.inc(1.0, 'after_system_config_add')
metrics.background_errors.inc(
1.0,
self._tenant.get_instance_name(),
'after_system_config_add',
)
raise
async def _after_system_config_rem(self, setting_name, value):
@ -1469,7 +1491,11 @@ class Server(BaseServer):
if setting_name == 'auth':
self._tenant.populate_sys_auth()
except Exception:
metrics.background_errors.inc(1.0, 'after_system_config_rem')
metrics.background_errors.inc(
1.0,
self._tenant.get_instance_name(),
'after_system_config_rem',
)
raise
async def run_startup_script_and_exit(self):

View file

@ -441,11 +441,13 @@ class Tenant(ha_base.ClusterProtocol):
if self._server.stmt_cache_size is not None:
rv.set_stmt_cache_size(self._server.stmt_cache_size)
except Exception:
metrics.backend_connection_establishment_errors.inc()
metrics.backend_connection_establishment_errors.inc(
1.0, self._instance_name
)
raise
finally:
metrics.backend_connection_establishment_latency.observe(
time.monotonic() - started_at
time.monotonic() - started_at, self._instance_name
)
if ha_serial == self._ha_master_serial:
rv.set_tenant(self)
@ -453,15 +455,15 @@ class Tenant(ha_base.ClusterProtocol):
self._backend_adaptive_ha.on_pgcon_made(
dbname == defines.EDGEDB_SYSTEM_DB
)
metrics.total_backend_connections.inc()
metrics.current_backend_connections.inc()
metrics.total_backend_connections.inc(1.0, self._instance_name)
metrics.current_backend_connections.inc(1.0, self._instance_name)
return rv
else:
rv.terminate()
raise ConnectionError("connected to outdated Postgres master")
async def _pg_disconnect(self, conn: pgcon.PGConnection) -> None:
metrics.current_backend_connections.dec()
metrics.current_backend_connections.dec(1.0, self._instance_name)
conn.terminate()
@contextlib.asynccontextmanager
@ -520,7 +522,9 @@ class Tenant(ha_base.ClusterProtocol):
self.on_sys_pgcon_failover_signal()
except Exception:
metrics.background_errors.inc(
1.0, "on_sys_pgcon_parameter_status_updated"
1.0,
self._instance_name,
"on_sys_pgcon_parameter_status_updated"
)
raise
@ -539,7 +543,9 @@ class Tenant(ha_base.ClusterProtocol):
self.on_switch_over()
# Else, the HA backend should take care of calling on_switch_over()
except Exception:
metrics.background_errors.inc(1.0, "on_sys_pgcon_failover_signal")
metrics.background_errors.inc(
1.0, self._instance_name, "on_sys_pgcon_failover_signal"
)
raise
def on_sys_pgcon_connection_lost(self, exc: Exception | None) -> None:
@ -566,7 +572,9 @@ class Tenant(ha_base.ClusterProtocol):
)
self.on_pgcon_broken(True)
except Exception:
metrics.background_errors.inc(1.0, "on_sys_pgcon_connection_lost")
metrics.background_errors.inc(
1.0, self._instance_name, "on_sys_pgcon_connection_lost"
)
raise
async def _reconnect_sys_pgcon(self) -> None:
@ -630,7 +638,9 @@ class Tenant(ha_base.ClusterProtocol):
if self._backend_adaptive_ha:
self._backend_adaptive_ha.on_pgcon_broken(is_sys_pgcon)
except Exception:
metrics.background_errors.inc(1.0, "on_pgcon_broken")
metrics.background_errors.inc(
1.0, self._instance_name, "on_pgcon_broken"
)
raise
def on_pgcon_lost(self) -> None:
@ -638,7 +648,8 @@ class Tenant(ha_base.ClusterProtocol):
if self._backend_adaptive_ha:
self._backend_adaptive_ha.on_pgcon_lost()
except Exception:
metrics.background_errors.inc(1.0, "on_pgcon_lost")
metrics.background_errors.inc(
1.0, self._instance_name, "on_pgcon_lost")
raise
def set_pg_unavailable_msg(self, msg: str | None) -> None:
@ -680,7 +691,9 @@ class Tenant(ha_base.ClusterProtocol):
try:
self._pg_pool.release(dbname, conn, discard=discard)
except Exception:
metrics.background_errors.inc(1.0, "release_pgcon")
metrics.background_errors.inc(
1.0, self._instance_name, "release_pgcon"
)
raise
def allow_database_connections(self, dbname: str) -> None:
@ -921,7 +934,9 @@ class Tenant(ha_base.ClusterProtocol):
+ data
)
except Exception:
metrics.background_errors.inc(1.0, "load_reported_config")
metrics.background_errors.inc(
1.0, self._instance_name, "load_reported_config"
)
raise
async def _load_sys_config(
@ -1129,7 +1144,9 @@ class Tenant(ha_base.ClusterProtocol):
self._dbindex.unregister_db(dbname)
self._block_new_connections.discard(dbname)
except Exception:
metrics.background_errors.inc(1.0, "on_after_drop_db")
metrics.background_errors.inc(
1.0, self._instance_name, "on_after_drop_db"
)
raise
async def cancel_pgcon_operation(self, con: pgcon.PGConnection) -> bool:
@ -1180,7 +1197,9 @@ class Tenant(ha_base.ClusterProtocol):
async with self.use_sys_pgcon() as con:
await con.signal_sysevent(event, **kwargs)
except Exception:
metrics.background_errors.inc(1.0, "signal_sysevent")
metrics.background_errors.inc(
1.0, self._instance_name, "signal_sysevent"
)
raise
def on_remote_database_quarantine(self, dbname: str) -> None:
@ -1194,7 +1213,9 @@ class Tenant(ha_base.ClusterProtocol):
try:
await self._pg_pool.prune_inactive_connections(dbname)
except Exception:
metrics.background_errors.inc(1.0, "remote_db_quarantine")
metrics.background_errors.inc(
1.0, self._instance_name, "remote_db_quarantine"
)
raise
self.create_task(task(), interruptable=True)
@ -1209,7 +1230,9 @@ class Tenant(ha_base.ClusterProtocol):
try:
await self.introspect_db(dbname)
except Exception:
metrics.background_errors.inc(1.0, "on_remote_ddl")
metrics.background_errors.inc(
1.0, self._instance_name, "on_remote_ddl"
)
raise
self.create_task(task(), interruptable=True)
@ -1250,7 +1273,9 @@ class Tenant(ha_base.ClusterProtocol):
await self.introspect_db(dbname)
except Exception:
metrics.background_errors.inc(
1.0, "on_remote_database_config_change"
1.0,
self._instance_name,
"on_remote_database_config_change",
)
raise
@ -1268,7 +1293,7 @@ class Tenant(ha_base.ClusterProtocol):
await self.introspect_db(dbname)
except Exception:
metrics.background_errors.inc(
1.0, "on_local_database_config_change"
1.0, self._instance_name, "on_local_database_config_change"
)
raise
@ -1288,7 +1313,7 @@ class Tenant(ha_base.ClusterProtocol):
self._server.reinit_idle_gc_collector()
except Exception:
metrics.background_errors.inc(
1.0, "on_remote_system_config_change"
1.0, self._instance_name, "on_remote_system_config_change"
)
raise
@ -1302,7 +1327,9 @@ class Tenant(ha_base.ClusterProtocol):
try:
await self._reintrospect_global_schema()
except Exception:
metrics.background_errors.inc(1.0, "on_global_schema_change")
metrics.background_errors.inc(
1.0, self._instance_name, "on_global_schema_change"
)
raise
self.create_task(task(), interruptable=True)

View file

@ -358,3 +358,59 @@ class TestPrometheusClient(unittest.TestCase):
pmc_r = run_pmc()
emc_r = run_emc()
self.assertEqual(pmc_r, emc_r)
def test_prometheus_08(self):
def run_pmc():
registry = PMC.Registry()
test_hist = PMC.Histogram(
'test_hist', 'A test info',
labelnames=['tenant'], registry=registry)
r0 = PMC.generate(registry)
test_hist.labels('1').observe(0.22)
test_hist.labels('2').observe(0.44)
test_hist.labels('1').observe(0.66)
test_hist.labels('2').observe(0.43)
test_hist.labels('1').observe(2.0)
r1 = PMC.generate(registry)
test_hist.labels('2').observe(-1)
test_hist.labels('1').observe(0.0001)
test_hist.labels('2').observe(0.43)
r2 = PMC.generate(registry)
return [r0, r1, r2]
def run_emc():
r = EP.Registry()
test_hist = r.new_labeled_histogram(
'test_hist', 'A test info', labels=('tenant',)
)
r0 = r.generate()
test_hist.observe(0.22, '1')
test_hist.observe(0.44, '2')
test_hist.observe(0.66, '1')
test_hist.observe(0.43, '2')
test_hist.observe(2.0, '1')
r1 = r.generate()
test_hist.observe(-1, '2')
test_hist.observe(0.0001, '1')
test_hist.observe(0.43, '2')
r2 = r.generate()
return [r0, r1, r2]
pmc_r = run_pmc()
emc_r = run_emc()
self.assertEqual(pmc_r, emc_r)

View file

@ -1469,10 +1469,10 @@ class TestSeparateCluster(tb.TestCase):
*(con.aclose() for con in active_cons)
)
self.assertIn(
f'\nedgedb_server_client_connections_idle_total ' +
f'{float(len(idle_cons))}\n',
metrics
self.assertRegex(
metrics,
r'\nedgedb_server_client_connections_idle_total\{.*\} ' +
f'{float(len(idle_cons))}\\n',
)
@unittest.skipIf(
@ -1808,10 +1808,10 @@ class TestSeparateCluster(tb.TestCase):
data = sd.fetch_metrics()
# Postgres: ERROR_IDLE_IN_TRANSACTION_TIMEOUT=25P03
self.assertIn(
'\nedgedb_server_backend_connections_aborted_total' +
'{pgcode="25P03"} 1.0\n',
data
self.assertRegex(
data,
r'\nedgedb_server_backend_connections_aborted_total' +
r'\{.*pgcode="25P03"\} 1.0\n',
)
async def test_server_config_query_timeout(self):