Module glean.metrics.timing_distribution

Classes

class TimingDistributionMetricType (common_metric_data: glean._uniffi.glean.CommonMetricData,
time_unit: glean._uniffi.glean.TimeUnit)
Expand source code
class TimingDistributionMetricType:
    """
    This implements the developer facing API for recording timing distribution
    metrics.

    Instances of this class type are automatically generated by
    `glean.load_metrics`, allowing developers to record values that were
    previously registered in the metrics.yaml file.
    """

    def __init__(
        self,
        common_metric_data: CommonMetricData,
        time_unit: TimeUnit,
    ):
        self._inner = TimingDistributionMetric(common_metric_data, time_unit)

    def start(self) -> TimerId:
        """
        Start tracking time for the provided metric.
        Multiple timers can run simultaneously.

        Returns:
            timer_id: The object to associate with this timing.
        """
        return self._inner.start()

    def stop_and_accumulate(self, timer_id: TimerId) -> None:
        """
        Stop tracking time for the provided metric and associated timer id. Add a
        count to the corresponding bucket in the timing distribution.
        This will record an error if no `start` was called.

        Args:
            timer_id: The timer id associated with this timing. This allows for
                concurrent timing of events associated with different ids to
                the same timespan metric.
        """
        self._inner.stop_and_accumulate(timer_id)

    def cancel(self, timer_id: TimerId) -> None:
        """
        Abort a previous `start` call. No error is recorded if no `start` was called.

        Args:
            timer_id: The timer id associated with this timing. This allows for
                concurrent timing of events associated with different ids to
                the same timing distribution metric.
        """
        self._inner.cancel(timer_id)

    def accumulate_samples(self, samples: list) -> None:
        """
        Accumulates the provided samples in the metric.

        Args:
            samples: The list holding the samples to be recorded by the metric.
        """
        self._inner.accumulate_samples(samples)

    def accumulate_single_sample(self, sample: int) -> None:
        """
        Accumulates a single sample and appends it to the metric.

        Args:
            sample: The single sample to be recorded by the metric.
        """
        self._inner.accumulate_single_sample(sample)

    class _TimingDistributionContextManager:
        """
        A context manager for recording timings. Used by the `measure` method.
        """

        def __init__(self, timing_distribution: "TimingDistributionMetricType"):
            self._timing_distribution = timing_distribution

        def __enter__(self) -> None:
            self._timer_id = self._timing_distribution.start()

        def __exit__(self, type, value, tb) -> None:
            if tb is None:
                self._timing_distribution.stop_and_accumulate(self._timer_id)
            else:
                self._timing_distribution.cancel(self._timer_id)

    def measure(self) -> "_TimingDistributionContextManager":
        """
        Provides a context manager for measuring the time it takes to execute
        snippets of code in a `with` statement.

        If the contents of the `with` statement raise an exception, the timing
        is not recorded.

        Usage:
            with metrics.perf.timer.measure():
                # ... do something that takes time ...
        """
        return self._TimingDistributionContextManager(self)

    def test_get_value(self, ping_name: Optional[str] = None) -> Optional[DistributionData]:
        """
        Returns the stored value for testing purposes only.

        Args:
            ping_name (str): (default: first value in send_in_pings) The name
                of the ping to retrieve the metric for.

        Returns:
            value (DistriubutionData): value of the stored metric.
        """
        return self._inner.test_get_value(ping_name)

    def test_get_num_recorded_errors(self, error_type: ErrorType) -> int:
        """
        Returns the number of errors recorded for the given metric.

        Args:
            error_type (ErrorType): The type of error recorded.
            ping_name (str): (default: first value in send_in_pings) The name
                of the ping to retrieve the metric for.

        Returns:
            num_errors (int): The number of errors recorded for the metric for
                the given error type.
        """
        return self._inner.test_get_num_recorded_errors(error_type)

This implements the developer facing API for recording timing distribution metrics.

Instances of this class type are automatically generated by load_metrics(), allowing developers to record values that were previously registered in the metrics.yaml file.

Methods

def accumulate_samples(self, samples: list) ‑> None
Expand source code
def accumulate_samples(self, samples: list) -> None:
    """
    Accumulates the provided samples in the metric.

    Args:
        samples: The list holding the samples to be recorded by the metric.
    """
    self._inner.accumulate_samples(samples)

Accumulates the provided samples in the metric.

Args

samples
The list holding the samples to be recorded by the metric.
def accumulate_single_sample(self, sample: int) ‑> None
Expand source code
def accumulate_single_sample(self, sample: int) -> None:
    """
    Accumulates a single sample and appends it to the metric.

    Args:
        sample: The single sample to be recorded by the metric.
    """
    self._inner.accumulate_single_sample(sample)

Accumulates a single sample and appends it to the metric.

Args

sample
The single sample to be recorded by the metric.
def cancel(self, timer_id: glean._uniffi.glean.TimerId) ‑> None
Expand source code
def cancel(self, timer_id: TimerId) -> None:
    """
    Abort a previous `start` call. No error is recorded if no `start` was called.

    Args:
        timer_id: The timer id associated with this timing. This allows for
            concurrent timing of events associated with different ids to
            the same timing distribution metric.
    """
    self._inner.cancel(timer_id)

Abort a previous start call. No error is recorded if no start was called.

Args

timer_id
The timer id associated with this timing. This allows for concurrent timing of events associated with different ids to the same timing distribution metric.
def measure(self) ‑> _TimingDistributionContextManager
Expand source code
def measure(self) -> "_TimingDistributionContextManager":
    """
    Provides a context manager for measuring the time it takes to execute
    snippets of code in a `with` statement.

    If the contents of the `with` statement raise an exception, the timing
    is not recorded.

    Usage:
        with metrics.perf.timer.measure():
            # ... do something that takes time ...
    """
    return self._TimingDistributionContextManager(self)

Provides a context manager for measuring the time it takes to execute snippets of code in a with statement.

If the contents of the with statement raise an exception, the timing is not recorded.

Usage

with metrics.perf.timer.measure(): # … do something that takes time …

def start(self) ‑> glean._uniffi.glean.TimerId
Expand source code
def start(self) -> TimerId:
    """
    Start tracking time for the provided metric.
    Multiple timers can run simultaneously.

    Returns:
        timer_id: The object to associate with this timing.
    """
    return self._inner.start()

Start tracking time for the provided metric. Multiple timers can run simultaneously.

Returns

timer_id
The object to associate with this timing.
def stop_and_accumulate(self, timer_id: glean._uniffi.glean.TimerId) ‑> None
Expand source code
def stop_and_accumulate(self, timer_id: TimerId) -> None:
    """
    Stop tracking time for the provided metric and associated timer id. Add a
    count to the corresponding bucket in the timing distribution.
    This will record an error if no `start` was called.

    Args:
        timer_id: The timer id associated with this timing. This allows for
            concurrent timing of events associated with different ids to
            the same timespan metric.
    """
    self._inner.stop_and_accumulate(timer_id)

Stop tracking time for the provided metric and associated timer id. Add a count to the corresponding bucket in the timing distribution. This will record an error if no start was called.

Args

timer_id
The timer id associated with this timing. This allows for concurrent timing of events associated with different ids to the same timespan metric.
def test_get_num_recorded_errors(self, error_type: glean._uniffi.glean.ErrorType) ‑> int
Expand source code
def test_get_num_recorded_errors(self, error_type: ErrorType) -> int:
    """
    Returns the number of errors recorded for the given metric.

    Args:
        error_type (ErrorType): The type of error recorded.
        ping_name (str): (default: first value in send_in_pings) The name
            of the ping to retrieve the metric for.

    Returns:
        num_errors (int): The number of errors recorded for the metric for
            the given error type.
    """
    return self._inner.test_get_num_recorded_errors(error_type)

Returns the number of errors recorded for the given metric.

Args

error_type : ErrorType
The type of error recorded.
ping_name : str
(default: first value in send_in_pings) The name of the ping to retrieve the metric for.

Returns

num_errors (int): The number of errors recorded for the metric for the given error type.

def test_get_value(self, ping_name: str | None = None) ‑> glean._uniffi.glean.DistributionData | None
Expand source code
def test_get_value(self, ping_name: Optional[str] = None) -> Optional[DistributionData]:
    """
    Returns the stored value for testing purposes only.

    Args:
        ping_name (str): (default: first value in send_in_pings) The name
            of the ping to retrieve the metric for.

    Returns:
        value (DistriubutionData): value of the stored metric.
    """
    return self._inner.test_get_value(ping_name)

Returns the stored value for testing purposes only.

Args

ping_name : str
(default: first value in send_in_pings) The name of the ping to retrieve the metric for.

Returns

value (DistriubutionData): value of the stored metric.