Module glean.metrics.timing_distribution
Expand source code
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from typing import Optional
from .._uniffi import CommonMetricData
from .._uniffi import DistributionData
from .._uniffi import TimeUnit
from .._uniffi import TimerId
from .._uniffi import TimingDistributionMetric
from ..testing import ErrorType
class TimingDistributionMetricType:
"""
This implements the developer facing API for recording timing distribution
metrics.
Instances of this class type are automatically generated by
`glean.load_metrics`, allowing developers to record values that were
previously registered in the metrics.yaml file.
"""
def __init__(
self,
common_metric_data: CommonMetricData,
time_unit: TimeUnit,
):
self._inner = TimingDistributionMetric(common_metric_data, time_unit)
def start(self) -> TimerId:
"""
Start tracking time for the provided metric.
Multiple timers can run simultaneously.
Returns:
timer_id: The object to associate with this timing.
"""
return self._inner.start()
def stop_and_accumulate(self, timer_id: TimerId) -> None:
"""
Stop tracking time for the provided metric and associated timer id. Add a
count to the corresponding bucket in the timing distribution.
This will record an error if no `start` was called.
Args:
timer_id: The timer id associated with this timing. This allows for
concurrent timing of events associated with different ids to
the same timespan metric.
"""
self._inner.stop_and_accumulate(timer_id)
def cancel(self, timer_id: TimerId) -> None:
"""
Abort a previous `start` call. No error is recorded if no `start` was called.
Args:
timer_id: The timer id associated with this timing. This allows for
concurrent timing of events associated with different ids to
the same timing distribution metric.
"""
self._inner.cancel(timer_id)
class _TimingDistributionContextManager:
"""
A context manager for recording timings. Used by the `measure` method.
"""
def __init__(self, timing_distribution: "TimingDistributionMetricType"):
self._timing_distribution = timing_distribution
def __enter__(self) -> None:
self._timer_id = self._timing_distribution.start()
def __exit__(self, type, value, tb) -> None:
if tb is None:
self._timing_distribution.stop_and_accumulate(self._timer_id)
else:
self._timing_distribution.cancel(self._timer_id)
def measure(self) -> "_TimingDistributionContextManager":
"""
Provides a context manager for measuring the time it takes to execute
snippets of code in a `with` statement.
If the contents of the `with` statement raise an exception, the timing
is not recorded.
Usage:
with metrics.perf.timer.measure():
# ... do something that takes time ...
"""
return self._TimingDistributionContextManager(self)
def test_get_value(
self, ping_name: Optional[str] = None
) -> Optional[DistributionData]:
"""
Returns the stored value for testing purposes only.
Args:
ping_name (str): (default: first value in send_in_pings) The name
of the ping to retrieve the metric for.
Returns:
value (DistriubutionData): value of the stored metric.
"""
return self._inner.test_get_value(ping_name)
def test_get_num_recorded_errors(self, error_type: ErrorType) -> int:
"""
Returns the number of errors recorded for the given metric.
Args:
error_type (ErrorType): The type of error recorded.
ping_name (str): (default: first value in send_in_pings) The name
of the ping to retrieve the metric for.
Returns:
num_errors (int): The number of errors recorded for the metric for
the given error type.
"""
return self._inner.test_get_num_recorded_errors(error_type)
__all__ = ["TimingDistributionMetricType"]
Classes
class TimingDistributionMetricType (common_metric_data: glean._uniffi.glean.CommonMetricData, time_unit: glean._uniffi.glean.TimeUnit)
-
This implements the developer facing API for recording timing distribution metrics.
Instances of this class type are automatically generated by
load_metrics()
, allowing developers to record values that were previously registered in the metrics.yaml file.Expand source code
class TimingDistributionMetricType: """ This implements the developer facing API for recording timing distribution metrics. Instances of this class type are automatically generated by `glean.load_metrics`, allowing developers to record values that were previously registered in the metrics.yaml file. """ def __init__( self, common_metric_data: CommonMetricData, time_unit: TimeUnit, ): self._inner = TimingDistributionMetric(common_metric_data, time_unit) def start(self) -> TimerId: """ Start tracking time for the provided metric. Multiple timers can run simultaneously. Returns: timer_id: The object to associate with this timing. """ return self._inner.start() def stop_and_accumulate(self, timer_id: TimerId) -> None: """ Stop tracking time for the provided metric and associated timer id. Add a count to the corresponding bucket in the timing distribution. This will record an error if no `start` was called. Args: timer_id: The timer id associated with this timing. This allows for concurrent timing of events associated with different ids to the same timespan metric. """ self._inner.stop_and_accumulate(timer_id) def cancel(self, timer_id: TimerId) -> None: """ Abort a previous `start` call. No error is recorded if no `start` was called. Args: timer_id: The timer id associated with this timing. This allows for concurrent timing of events associated with different ids to the same timing distribution metric. """ self._inner.cancel(timer_id) class _TimingDistributionContextManager: """ A context manager for recording timings. Used by the `measure` method. """ def __init__(self, timing_distribution: "TimingDistributionMetricType"): self._timing_distribution = timing_distribution def __enter__(self) -> None: self._timer_id = self._timing_distribution.start() def __exit__(self, type, value, tb) -> None: if tb is None: self._timing_distribution.stop_and_accumulate(self._timer_id) else: self._timing_distribution.cancel(self._timer_id) def measure(self) -> "_TimingDistributionContextManager": """ Provides a context manager for measuring the time it takes to execute snippets of code in a `with` statement. If the contents of the `with` statement raise an exception, the timing is not recorded. Usage: with metrics.perf.timer.measure(): # ... do something that takes time ... """ return self._TimingDistributionContextManager(self) def test_get_value( self, ping_name: Optional[str] = None ) -> Optional[DistributionData]: """ Returns the stored value for testing purposes only. Args: ping_name (str): (default: first value in send_in_pings) The name of the ping to retrieve the metric for. Returns: value (DistriubutionData): value of the stored metric. """ return self._inner.test_get_value(ping_name) def test_get_num_recorded_errors(self, error_type: ErrorType) -> int: """ Returns the number of errors recorded for the given metric. Args: error_type (ErrorType): The type of error recorded. ping_name (str): (default: first value in send_in_pings) The name of the ping to retrieve the metric for. Returns: num_errors (int): The number of errors recorded for the metric for the given error type. """ return self._inner.test_get_num_recorded_errors(error_type)
Methods
def cancel(self, timer_id: glean._uniffi.glean.TimerId) ‑> None
-
Abort a previous
start
call. No error is recorded if nostart
was called.Args
timer_id
- The timer id associated with this timing. This allows for concurrent timing of events associated with different ids to the same timing distribution metric.
Expand source code
def cancel(self, timer_id: TimerId) -> None: """ Abort a previous `start` call. No error is recorded if no `start` was called. Args: timer_id: The timer id associated with this timing. This allows for concurrent timing of events associated with different ids to the same timing distribution metric. """ self._inner.cancel(timer_id)
def measure(self) ‑> _TimingDistributionContextManager
-
Provides a context manager for measuring the time it takes to execute snippets of code in a
with
statement.If the contents of the
with
statement raise an exception, the timing is not recorded.Usage
with metrics.perf.timer.measure(): # … do something that takes time …
Expand source code
def measure(self) -> "_TimingDistributionContextManager": """ Provides a context manager for measuring the time it takes to execute snippets of code in a `with` statement. If the contents of the `with` statement raise an exception, the timing is not recorded. Usage: with metrics.perf.timer.measure(): # ... do something that takes time ... """ return self._TimingDistributionContextManager(self)
def start(self) ‑> glean._uniffi.glean.TimerId
-
Start tracking time for the provided metric. Multiple timers can run simultaneously.
Returns
timer_id
- The object to associate with this timing.
Expand source code
def start(self) -> TimerId: """ Start tracking time for the provided metric. Multiple timers can run simultaneously. Returns: timer_id: The object to associate with this timing. """ return self._inner.start()
def stop_and_accumulate(self, timer_id: glean._uniffi.glean.TimerId) ‑> None
-
Stop tracking time for the provided metric and associated timer id. Add a count to the corresponding bucket in the timing distribution. This will record an error if no
start
was called.Args
timer_id
- The timer id associated with this timing. This allows for concurrent timing of events associated with different ids to the same timespan metric.
Expand source code
def stop_and_accumulate(self, timer_id: TimerId) -> None: """ Stop tracking time for the provided metric and associated timer id. Add a count to the corresponding bucket in the timing distribution. This will record an error if no `start` was called. Args: timer_id: The timer id associated with this timing. This allows for concurrent timing of events associated with different ids to the same timespan metric. """ self._inner.stop_and_accumulate(timer_id)
def test_get_num_recorded_errors(self, error_type: glean._uniffi.glean.ErrorType) ‑> int
-
Returns the number of errors recorded for the given metric.
Args
error_type
:ErrorType
- The type of error recorded.
ping_name
:str
- (default: first value in send_in_pings) The name of the ping to retrieve the metric for.
Returns
num_errors (int): The number of errors recorded for the metric for the given error type.
Expand source code
def test_get_num_recorded_errors(self, error_type: ErrorType) -> int: """ Returns the number of errors recorded for the given metric. Args: error_type (ErrorType): The type of error recorded. ping_name (str): (default: first value in send_in_pings) The name of the ping to retrieve the metric for. Returns: num_errors (int): The number of errors recorded for the metric for the given error type. """ return self._inner.test_get_num_recorded_errors(error_type)
def test_get_value(self, ping_name: Optional[str] = None) ‑> Optional[glean._uniffi.glean.DistributionData]
-
Returns the stored value for testing purposes only.
Args
ping_name
:str
- (default: first value in send_in_pings) The name of the ping to retrieve the metric for.
Returns
value (DistriubutionData): value of the stored metric.
Expand source code
def test_get_value( self, ping_name: Optional[str] = None ) -> Optional[DistributionData]: """ Returns the stored value for testing purposes only. Args: ping_name (str): (default: first value in send_in_pings) The name of the ping to retrieve the metric for. Returns: value (DistriubutionData): value of the stored metric. """ return self._inner.test_get_value(ping_name)