Module glean.metrics.timing_distribution

Expand source code
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


from typing import Optional


from .._uniffi import CommonMetricData
from .._uniffi import DistributionData
from .._uniffi import TimeUnit
from .._uniffi import TimerId
from .._uniffi import TimingDistributionMetric
from ..testing import ErrorType


class TimingDistributionMetricType:
    """
    This implements the developer facing API for recording timing distribution
    metrics.

    Instances of this class type are automatically generated by
    `glean.load_metrics`, allowing developers to record values that were
    previously registered in the metrics.yaml file.
    """

    def __init__(
        self,
        common_metric_data: CommonMetricData,
        time_unit: TimeUnit,
    ):
        self._inner = TimingDistributionMetric(common_metric_data, time_unit)

    def start(self) -> TimerId:
        """
        Start tracking time for the provided metric.
        Multiple timers can run simultaneously.

        Returns:
            timer_id: The object to associate with this timing.
        """
        return self._inner.start()

    def stop_and_accumulate(self, timer_id: TimerId) -> None:
        """
        Stop tracking time for the provided metric and associated timer id. Add a
        count to the corresponding bucket in the timing distribution.
        This will record an error if no `start` was called.

        Args:
            timer_id: The timer id associated with this timing. This allows for
                concurrent timing of events associated with different ids to
                the same timespan metric.
        """
        self._inner.stop_and_accumulate(timer_id)

    def cancel(self, timer_id: TimerId) -> None:
        """
        Abort a previous `start` call. No error is recorded if no `start` was called.

        Args:
            timer_id: The timer id associated with this timing. This allows for
                concurrent timing of events associated with different ids to
                the same timing distribution metric.
        """
        self._inner.cancel(timer_id)

    def accumulate_samples(self, samples: list) -> None:
        """
        Accumulates the provided samples in the metric.

        Args:
            samples: The list holding the samples to be recorded by the metric.
        """
        self._inner.accumulate_samples(samples)

    def accumulate_single_sample(self, sample: int) -> None:
        """
        Accumulates a single sample and appends it to the metric.

        Args:
            sample: The single sample to be recorded by the metric.
        """
        self._inner.accumulate_single_sample(sample)

    class _TimingDistributionContextManager:
        """
        A context manager for recording timings. Used by the `measure` method.
        """

        def __init__(self, timing_distribution: "TimingDistributionMetricType"):
            self._timing_distribution = timing_distribution

        def __enter__(self) -> None:
            self._timer_id = self._timing_distribution.start()

        def __exit__(self, type, value, tb) -> None:
            if tb is None:
                self._timing_distribution.stop_and_accumulate(self._timer_id)
            else:
                self._timing_distribution.cancel(self._timer_id)

    def measure(self) -> "_TimingDistributionContextManager":
        """
        Provides a context manager for measuring the time it takes to execute
        snippets of code in a `with` statement.

        If the contents of the `with` statement raise an exception, the timing
        is not recorded.

        Usage:
            with metrics.perf.timer.measure():
                # ... do something that takes time ...
        """
        return self._TimingDistributionContextManager(self)

    def test_get_value(self, ping_name: Optional[str] = None) -> Optional[DistributionData]:
        """
        Returns the stored value for testing purposes only.

        Args:
            ping_name (str): (default: first value in send_in_pings) The name
                of the ping to retrieve the metric for.

        Returns:
            value (DistriubutionData): value of the stored metric.
        """
        return self._inner.test_get_value(ping_name)

    def test_get_num_recorded_errors(self, error_type: ErrorType) -> int:
        """
        Returns the number of errors recorded for the given metric.

        Args:
            error_type (ErrorType): The type of error recorded.
            ping_name (str): (default: first value in send_in_pings) The name
                of the ping to retrieve the metric for.

        Returns:
            num_errors (int): The number of errors recorded for the metric for
                the given error type.
        """
        return self._inner.test_get_num_recorded_errors(error_type)


__all__ = ["TimingDistributionMetricType"]

Classes

class TimingDistributionMetricType (common_metric_data: glean._uniffi.glean.CommonMetricData, time_unit: glean._uniffi.glean.TimeUnit)

This implements the developer facing API for recording timing distribution metrics.

Instances of this class type are automatically generated by load_metrics(), allowing developers to record values that were previously registered in the metrics.yaml file.

Expand source code
class TimingDistributionMetricType:
    """
    This implements the developer facing API for recording timing distribution
    metrics.

    Instances of this class type are automatically generated by
    `glean.load_metrics`, allowing developers to record values that were
    previously registered in the metrics.yaml file.
    """

    def __init__(
        self,
        common_metric_data: CommonMetricData,
        time_unit: TimeUnit,
    ):
        self._inner = TimingDistributionMetric(common_metric_data, time_unit)

    def start(self) -> TimerId:
        """
        Start tracking time for the provided metric.
        Multiple timers can run simultaneously.

        Returns:
            timer_id: The object to associate with this timing.
        """
        return self._inner.start()

    def stop_and_accumulate(self, timer_id: TimerId) -> None:
        """
        Stop tracking time for the provided metric and associated timer id. Add a
        count to the corresponding bucket in the timing distribution.
        This will record an error if no `start` was called.

        Args:
            timer_id: The timer id associated with this timing. This allows for
                concurrent timing of events associated with different ids to
                the same timespan metric.
        """
        self._inner.stop_and_accumulate(timer_id)

    def cancel(self, timer_id: TimerId) -> None:
        """
        Abort a previous `start` call. No error is recorded if no `start` was called.

        Args:
            timer_id: The timer id associated with this timing. This allows for
                concurrent timing of events associated with different ids to
                the same timing distribution metric.
        """
        self._inner.cancel(timer_id)

    def accumulate_samples(self, samples: list) -> None:
        """
        Accumulates the provided samples in the metric.

        Args:
            samples: The list holding the samples to be recorded by the metric.
        """
        self._inner.accumulate_samples(samples)

    def accumulate_single_sample(self, sample: int) -> None:
        """
        Accumulates a single sample and appends it to the metric.

        Args:
            sample: The single sample to be recorded by the metric.
        """
        self._inner.accumulate_single_sample(sample)

    class _TimingDistributionContextManager:
        """
        A context manager for recording timings. Used by the `measure` method.
        """

        def __init__(self, timing_distribution: "TimingDistributionMetricType"):
            self._timing_distribution = timing_distribution

        def __enter__(self) -> None:
            self._timer_id = self._timing_distribution.start()

        def __exit__(self, type, value, tb) -> None:
            if tb is None:
                self._timing_distribution.stop_and_accumulate(self._timer_id)
            else:
                self._timing_distribution.cancel(self._timer_id)

    def measure(self) -> "_TimingDistributionContextManager":
        """
        Provides a context manager for measuring the time it takes to execute
        snippets of code in a `with` statement.

        If the contents of the `with` statement raise an exception, the timing
        is not recorded.

        Usage:
            with metrics.perf.timer.measure():
                # ... do something that takes time ...
        """
        return self._TimingDistributionContextManager(self)

    def test_get_value(self, ping_name: Optional[str] = None) -> Optional[DistributionData]:
        """
        Returns the stored value for testing purposes only.

        Args:
            ping_name (str): (default: first value in send_in_pings) The name
                of the ping to retrieve the metric for.

        Returns:
            value (DistriubutionData): value of the stored metric.
        """
        return self._inner.test_get_value(ping_name)

    def test_get_num_recorded_errors(self, error_type: ErrorType) -> int:
        """
        Returns the number of errors recorded for the given metric.

        Args:
            error_type (ErrorType): The type of error recorded.
            ping_name (str): (default: first value in send_in_pings) The name
                of the ping to retrieve the metric for.

        Returns:
            num_errors (int): The number of errors recorded for the metric for
                the given error type.
        """
        return self._inner.test_get_num_recorded_errors(error_type)

Methods

def accumulate_samples(self, samples: list) ‑> None

Accumulates the provided samples in the metric.

Args

samples
The list holding the samples to be recorded by the metric.
Expand source code
def accumulate_samples(self, samples: list) -> None:
    """
    Accumulates the provided samples in the metric.

    Args:
        samples: The list holding the samples to be recorded by the metric.
    """
    self._inner.accumulate_samples(samples)
def accumulate_single_sample(self, sample: int) ‑> None

Accumulates a single sample and appends it to the metric.

Args

sample
The single sample to be recorded by the metric.
Expand source code
def accumulate_single_sample(self, sample: int) -> None:
    """
    Accumulates a single sample and appends it to the metric.

    Args:
        sample: The single sample to be recorded by the metric.
    """
    self._inner.accumulate_single_sample(sample)
def cancel(self, timer_id: glean._uniffi.glean.TimerId) ‑> None

Abort a previous start call. No error is recorded if no start was called.

Args

timer_id
The timer id associated with this timing. This allows for concurrent timing of events associated with different ids to the same timing distribution metric.
Expand source code
def cancel(self, timer_id: TimerId) -> None:
    """
    Abort a previous `start` call. No error is recorded if no `start` was called.

    Args:
        timer_id: The timer id associated with this timing. This allows for
            concurrent timing of events associated with different ids to
            the same timing distribution metric.
    """
    self._inner.cancel(timer_id)
def measure(self) ‑> _TimingDistributionContextManager

Provides a context manager for measuring the time it takes to execute snippets of code in a with statement.

If the contents of the with statement raise an exception, the timing is not recorded.

Usage

with metrics.perf.timer.measure(): # … do something that takes time …

Expand source code
def measure(self) -> "_TimingDistributionContextManager":
    """
    Provides a context manager for measuring the time it takes to execute
    snippets of code in a `with` statement.

    If the contents of the `with` statement raise an exception, the timing
    is not recorded.

    Usage:
        with metrics.perf.timer.measure():
            # ... do something that takes time ...
    """
    return self._TimingDistributionContextManager(self)
def start(self) ‑> glean._uniffi.glean.TimerId

Start tracking time for the provided metric. Multiple timers can run simultaneously.

Returns

timer_id
The object to associate with this timing.
Expand source code
def start(self) -> TimerId:
    """
    Start tracking time for the provided metric.
    Multiple timers can run simultaneously.

    Returns:
        timer_id: The object to associate with this timing.
    """
    return self._inner.start()
def stop_and_accumulate(self, timer_id: glean._uniffi.glean.TimerId) ‑> None

Stop tracking time for the provided metric and associated timer id. Add a count to the corresponding bucket in the timing distribution. This will record an error if no start was called.

Args

timer_id
The timer id associated with this timing. This allows for concurrent timing of events associated with different ids to the same timespan metric.
Expand source code
def stop_and_accumulate(self, timer_id: TimerId) -> None:
    """
    Stop tracking time for the provided metric and associated timer id. Add a
    count to the corresponding bucket in the timing distribution.
    This will record an error if no `start` was called.

    Args:
        timer_id: The timer id associated with this timing. This allows for
            concurrent timing of events associated with different ids to
            the same timespan metric.
    """
    self._inner.stop_and_accumulate(timer_id)
def test_get_num_recorded_errors(self, error_type: glean._uniffi.glean.ErrorType) ‑> int

Returns the number of errors recorded for the given metric.

Args

error_type : ErrorType
The type of error recorded.
ping_name : str
(default: first value in send_in_pings) The name of the ping to retrieve the metric for.

Returns

num_errors (int): The number of errors recorded for the metric for the given error type.

Expand source code
def test_get_num_recorded_errors(self, error_type: ErrorType) -> int:
    """
    Returns the number of errors recorded for the given metric.

    Args:
        error_type (ErrorType): The type of error recorded.
        ping_name (str): (default: first value in send_in_pings) The name
            of the ping to retrieve the metric for.

    Returns:
        num_errors (int): The number of errors recorded for the metric for
            the given error type.
    """
    return self._inner.test_get_num_recorded_errors(error_type)
def test_get_value(self, ping_name: Optional[str] = None) ‑> Optional[glean._uniffi.glean.DistributionData]

Returns the stored value for testing purposes only.

Args

ping_name : str
(default: first value in send_in_pings) The name of the ping to retrieve the metric for.

Returns

value (DistriubutionData): value of the stored metric.

Expand source code
def test_get_value(self, ping_name: Optional[str] = None) -> Optional[DistributionData]:
    """
    Returns the stored value for testing purposes only.

    Args:
        ping_name (str): (default: first value in send_in_pings) The name
            of the ping to retrieve the metric for.

    Returns:
        value (DistriubutionData): value of the stored metric.
    """
    return self._inner.test_get_value(ping_name)