Module glean.metrics.timing_distribution

Expand source code
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


from typing import List, Optional


from .. import _ffi
from .._dispatcher import Dispatcher
from ..testing import ErrorType
from .. import _util


from .distribution_data import DistributionData
from .lifetime import Lifetime
from .timeunit import TimeUnit


class TimingDistributionMetricType:
    """
    This implements the developer facing API for recording timing distribution
    metrics.

    Instances of this class type are automatically generated by
    `glean.load_metrics`, allowing developers to record values that were
    previously registered in the metrics.yaml file.
    """

    def __init__(
        self,
        disabled: bool,
        category: str,
        lifetime: Lifetime,
        name: str,
        send_in_pings: List[str],
        time_unit: TimeUnit,
    ):
        self._disabled = disabled
        self._send_in_pings = send_in_pings

        self._handle = _ffi.lib.glean_new_timing_distribution_metric(
            _ffi.ffi_encode_string(category),
            _ffi.ffi_encode_string(name),
            _ffi.ffi_encode_vec_string(send_in_pings),
            len(send_in_pings),
            lifetime.value,
            disabled,
            time_unit.value,
        )

    def __del__(self):
        if getattr(self, "_handle", 0) != 0:
            _ffi.lib.glean_destroy_timing_distribution_metric(self._handle)

    def start(self) -> Optional[int]:
        """
        Start tracking time for the provided metric.
        Multiple timers can run simultaneously.

        Returns:
            timer_id: The object to associate with this timing.
        """
        if self._disabled:
            return None

        # Even though the Rust code for `start` runs synchronously, the Rust
        # code for `stopAndAccumulate` runs asynchronously, and we need to use
        # the same clock for start and stop. Therefore we take the time on the
        # Python side, both here and in `stopAndAccumulate`.
        start_time = _util.time_ns()

        # No dispatcher, we need the return value
        return _ffi.lib.glean_timing_distribution_set_start(self._handle, start_time)

    def stop_and_accumulate(self, timer_id: Optional[int]) -> None:
        """
        Stop tracking time for the provided metric and associated timer id. Add a
        count to the corresponding bucket in the timing distribution.
        This will record an error if no `start` was called.

        Args:
            timer_id: The timer id associated with this timing. This allows for
                concurrent timing of events associated with different ids to
                the same timespan metric.
        """
        # `start` may have returned None.
        # Accepting that means users of this API don't need to do a None check.
        if self._disabled or timer_id is None:
            return

        # The Rust code runs async and might be delayed. We need the time as
        # precisely as possible. We also need the same clock for start and stop
        # (`start` takes the time on the Python side).
        stop_time = _util.time_ns()

        @Dispatcher.launch
        def stop_and_accumulate():
            # CFFI prevents us from passing a negative value to
            # `glean_timing_distribution_set_stop_and_accumulate`. However, to be
            # consistent with the other platforms, we should still pass an invalid
            # value to the Rust side so it can record an error using the Glean
            # error reporting system.
            if timer_id < 0:
                # 64-bit maxint
                corrected_timer_id = 0xFFFFFFFFFFFFFFFF
            else:
                corrected_timer_id = timer_id

            _ffi.lib.glean_timing_distribution_set_stop_and_accumulate(
                self._handle, corrected_timer_id, stop_time
            )

    def cancel(self, timer_id: Optional[int]) -> None:
        """
        Abort a previous `start` call. No error is recorded if no `start` was called.

        Args:
            timer_id: The timer id associated with this timing. This allows for
                concurrent timing of events associated with different ids to
                the same timing distribution metric.
        """
        # `start` may have returned None.
        # Accepting that means users of this API don't need to do a None check.
        if self._disabled or timer_id is None:
            return

        @Dispatcher.launch
        def cancel():
            # CFFI prevents us from passing a negative value to
            # `glean_timing_distribution_cancel`. However, to be consistent
            # with the other platforms, we should still pass an invalid value
            # to the Rust side so it can record an error using the Glean error
            # reporting system.
            if timer_id < 0:
                # 64-bit maxint
                corrected_timer_id = 0xFFFFFFFFFFFFFFFF
            else:
                corrected_timer_id = timer_id

            _ffi.lib.glean_timing_distribution_cancel(self._handle, corrected_timer_id)

    class _TimingDistributionContextManager:
        """
        A context manager for recording timings. Used by the `measure` method.
        """

        def __init__(self, timing_distribution: "TimingDistributionMetricType"):
            self._timing_distribution = timing_distribution

        def __enter__(self) -> None:
            self._timer_id = self._timing_distribution.start()

        def __exit__(self, type, value, tb) -> None:
            if tb is None:
                self._timing_distribution.stop_and_accumulate(self._timer_id)
            else:
                self._timing_distribution.cancel(self._timer_id)

    def measure(self) -> "_TimingDistributionContextManager":
        """
        Provides a context manager for measuring the time it takes to execute
        snippets of code in a `with` statement.

        If the contents of the `with` statement raise an exception, the timing
        is not recorded.

        Usage:
            with metrics.perf.timer.measure():
                # ... do something that takes time ...
        """
        return self._TimingDistributionContextManager(self)

    def test_has_value(self, ping_name: Optional[str] = None) -> bool:
        """
        Tests whether a value is stored for the metric for testing purposes
        only.

        Args:
            ping_name (str): (default: first value in send_in_pings) The name
                of the ping to retrieve the metric for.

        Returns:
            has_value (bool): True if the metric value exists.
        """
        if ping_name is None:
            ping_name = self._send_in_pings[0]

        return bool(
            _ffi.lib.glean_timing_distribution_test_has_value(
                self._handle, _ffi.ffi_encode_string(ping_name)
            )
        )

    def test_get_value(self, ping_name: Optional[str] = None) -> DistributionData:
        """
        Returns the stored value for testing purposes only.

        Args:
            ping_name (str): (default: first value in send_in_pings) The name
                of the ping to retrieve the metric for.

        Returns:
            value (DistriubutionData): value of the stored metric.
        """
        if ping_name is None:
            ping_name = self._send_in_pings[0]

        if not self.test_has_value(ping_name):
            raise ValueError("metric has no value")

        return DistributionData.from_json_string(
            _ffi.ffi_decode_string(
                _ffi.lib.glean_timing_distribution_test_get_value_as_json_string(
                    self._handle, _ffi.ffi_encode_string(ping_name)
                )
            )
        )

    def test_get_num_recorded_errors(
        self, error_type: ErrorType, ping_name: Optional[str] = None
    ) -> int:
        """
        Returns the number of errors recorded for the given metric.

        Args:
            error_type (ErrorType): The type of error recorded.
            ping_name (str): (default: first value in send_in_pings) The name
                of the ping to retrieve the metric for.

        Returns:
            num_errors (int): The number of errors recorded for the metric for
                the given error type.
        """
        if ping_name is None:
            ping_name = self._send_in_pings[0]

        return _ffi.lib.glean_timing_distribution_test_get_num_recorded_errors(
            self._handle,
            error_type.value,
            _ffi.ffi_encode_string(ping_name),
        )


__all__ = ["TimingDistributionMetricType"]

Classes

class TimingDistributionMetricType (disabled: bool, category: str, lifetime: Lifetime, name: str, send_in_pings: List[str], time_unit: TimeUnit)

This implements the developer facing API for recording timing distribution metrics.

Instances of this class type are automatically generated by load_metrics(), allowing developers to record values that were previously registered in the metrics.yaml file.

Expand source code
class TimingDistributionMetricType:
    """
    This implements the developer facing API for recording timing distribution
    metrics.

    Instances of this class type are automatically generated by
    `glean.load_metrics`, allowing developers to record values that were
    previously registered in the metrics.yaml file.
    """

    def __init__(
        self,
        disabled: bool,
        category: str,
        lifetime: Lifetime,
        name: str,
        send_in_pings: List[str],
        time_unit: TimeUnit,
    ):
        self._disabled = disabled
        self._send_in_pings = send_in_pings

        self._handle = _ffi.lib.glean_new_timing_distribution_metric(
            _ffi.ffi_encode_string(category),
            _ffi.ffi_encode_string(name),
            _ffi.ffi_encode_vec_string(send_in_pings),
            len(send_in_pings),
            lifetime.value,
            disabled,
            time_unit.value,
        )

    def __del__(self):
        if getattr(self, "_handle", 0) != 0:
            _ffi.lib.glean_destroy_timing_distribution_metric(self._handle)

    def start(self) -> Optional[int]:
        """
        Start tracking time for the provided metric.
        Multiple timers can run simultaneously.

        Returns:
            timer_id: The object to associate with this timing.
        """
        if self._disabled:
            return None

        # Even though the Rust code for `start` runs synchronously, the Rust
        # code for `stopAndAccumulate` runs asynchronously, and we need to use
        # the same clock for start and stop. Therefore we take the time on the
        # Python side, both here and in `stopAndAccumulate`.
        start_time = _util.time_ns()

        # No dispatcher, we need the return value
        return _ffi.lib.glean_timing_distribution_set_start(self._handle, start_time)

    def stop_and_accumulate(self, timer_id: Optional[int]) -> None:
        """
        Stop tracking time for the provided metric and associated timer id. Add a
        count to the corresponding bucket in the timing distribution.
        This will record an error if no `start` was called.

        Args:
            timer_id: The timer id associated with this timing. This allows for
                concurrent timing of events associated with different ids to
                the same timespan metric.
        """
        # `start` may have returned None.
        # Accepting that means users of this API don't need to do a None check.
        if self._disabled or timer_id is None:
            return

        # The Rust code runs async and might be delayed. We need the time as
        # precisely as possible. We also need the same clock for start and stop
        # (`start` takes the time on the Python side).
        stop_time = _util.time_ns()

        @Dispatcher.launch
        def stop_and_accumulate():
            # CFFI prevents us from passing a negative value to
            # `glean_timing_distribution_set_stop_and_accumulate`. However, to be
            # consistent with the other platforms, we should still pass an invalid
            # value to the Rust side so it can record an error using the Glean
            # error reporting system.
            if timer_id < 0:
                # 64-bit maxint
                corrected_timer_id = 0xFFFFFFFFFFFFFFFF
            else:
                corrected_timer_id = timer_id

            _ffi.lib.glean_timing_distribution_set_stop_and_accumulate(
                self._handle, corrected_timer_id, stop_time
            )

    def cancel(self, timer_id: Optional[int]) -> None:
        """
        Abort a previous `start` call. No error is recorded if no `start` was called.

        Args:
            timer_id: The timer id associated with this timing. This allows for
                concurrent timing of events associated with different ids to
                the same timing distribution metric.
        """
        # `start` may have returned None.
        # Accepting that means users of this API don't need to do a None check.
        if self._disabled or timer_id is None:
            return

        @Dispatcher.launch
        def cancel():
            # CFFI prevents us from passing a negative value to
            # `glean_timing_distribution_cancel`. However, to be consistent
            # with the other platforms, we should still pass an invalid value
            # to the Rust side so it can record an error using the Glean error
            # reporting system.
            if timer_id < 0:
                # 64-bit maxint
                corrected_timer_id = 0xFFFFFFFFFFFFFFFF
            else:
                corrected_timer_id = timer_id

            _ffi.lib.glean_timing_distribution_cancel(self._handle, corrected_timer_id)

    class _TimingDistributionContextManager:
        """
        A context manager for recording timings. Used by the `measure` method.
        """

        def __init__(self, timing_distribution: "TimingDistributionMetricType"):
            self._timing_distribution = timing_distribution

        def __enter__(self) -> None:
            self._timer_id = self._timing_distribution.start()

        def __exit__(self, type, value, tb) -> None:
            if tb is None:
                self._timing_distribution.stop_and_accumulate(self._timer_id)
            else:
                self._timing_distribution.cancel(self._timer_id)

    def measure(self) -> "_TimingDistributionContextManager":
        """
        Provides a context manager for measuring the time it takes to execute
        snippets of code in a `with` statement.

        If the contents of the `with` statement raise an exception, the timing
        is not recorded.

        Usage:
            with metrics.perf.timer.measure():
                # ... do something that takes time ...
        """
        return self._TimingDistributionContextManager(self)

    def test_has_value(self, ping_name: Optional[str] = None) -> bool:
        """
        Tests whether a value is stored for the metric for testing purposes
        only.

        Args:
            ping_name (str): (default: first value in send_in_pings) The name
                of the ping to retrieve the metric for.

        Returns:
            has_value (bool): True if the metric value exists.
        """
        if ping_name is None:
            ping_name = self._send_in_pings[0]

        return bool(
            _ffi.lib.glean_timing_distribution_test_has_value(
                self._handle, _ffi.ffi_encode_string(ping_name)
            )
        )

    def test_get_value(self, ping_name: Optional[str] = None) -> DistributionData:
        """
        Returns the stored value for testing purposes only.

        Args:
            ping_name (str): (default: first value in send_in_pings) The name
                of the ping to retrieve the metric for.

        Returns:
            value (DistriubutionData): value of the stored metric.
        """
        if ping_name is None:
            ping_name = self._send_in_pings[0]

        if not self.test_has_value(ping_name):
            raise ValueError("metric has no value")

        return DistributionData.from_json_string(
            _ffi.ffi_decode_string(
                _ffi.lib.glean_timing_distribution_test_get_value_as_json_string(
                    self._handle, _ffi.ffi_encode_string(ping_name)
                )
            )
        )

    def test_get_num_recorded_errors(
        self, error_type: ErrorType, ping_name: Optional[str] = None
    ) -> int:
        """
        Returns the number of errors recorded for the given metric.

        Args:
            error_type (ErrorType): The type of error recorded.
            ping_name (str): (default: first value in send_in_pings) The name
                of the ping to retrieve the metric for.

        Returns:
            num_errors (int): The number of errors recorded for the metric for
                the given error type.
        """
        if ping_name is None:
            ping_name = self._send_in_pings[0]

        return _ffi.lib.glean_timing_distribution_test_get_num_recorded_errors(
            self._handle,
            error_type.value,
            _ffi.ffi_encode_string(ping_name),
        )

Methods

def cancel(self, timer_id: Optional[int]) ‑> None

Abort a previous start call. No error is recorded if no start was called.

Args

timer_id
The timer id associated with this timing. This allows for concurrent timing of events associated with different ids to the same timing distribution metric.
Expand source code
def cancel(self, timer_id: Optional[int]) -> None:
    """
    Abort a previous `start` call. No error is recorded if no `start` was called.

    Args:
        timer_id: The timer id associated with this timing. This allows for
            concurrent timing of events associated with different ids to
            the same timing distribution metric.
    """
    # `start` may have returned None.
    # Accepting that means users of this API don't need to do a None check.
    if self._disabled or timer_id is None:
        return

    @Dispatcher.launch
    def cancel():
        # CFFI prevents us from passing a negative value to
        # `glean_timing_distribution_cancel`. However, to be consistent
        # with the other platforms, we should still pass an invalid value
        # to the Rust side so it can record an error using the Glean error
        # reporting system.
        if timer_id < 0:
            # 64-bit maxint
            corrected_timer_id = 0xFFFFFFFFFFFFFFFF
        else:
            corrected_timer_id = timer_id

        _ffi.lib.glean_timing_distribution_cancel(self._handle, corrected_timer_id)
def measure(self) ‑> _TimingDistributionContextManager

Provides a context manager for measuring the time it takes to execute snippets of code in a with statement.

If the contents of the with statement raise an exception, the timing is not recorded.

Usage

with metrics.perf.timer.measure(): # … do something that takes time …

Expand source code
def measure(self) -> "_TimingDistributionContextManager":
    """
    Provides a context manager for measuring the time it takes to execute
    snippets of code in a `with` statement.

    If the contents of the `with` statement raise an exception, the timing
    is not recorded.

    Usage:
        with metrics.perf.timer.measure():
            # ... do something that takes time ...
    """
    return self._TimingDistributionContextManager(self)
def start(self) ‑> Optional[int]

Start tracking time for the provided metric. Multiple timers can run simultaneously.

Returns

timer_id
The object to associate with this timing.
Expand source code
def start(self) -> Optional[int]:
    """
    Start tracking time for the provided metric.
    Multiple timers can run simultaneously.

    Returns:
        timer_id: The object to associate with this timing.
    """
    if self._disabled:
        return None

    # Even though the Rust code for `start` runs synchronously, the Rust
    # code for `stopAndAccumulate` runs asynchronously, and we need to use
    # the same clock for start and stop. Therefore we take the time on the
    # Python side, both here and in `stopAndAccumulate`.
    start_time = _util.time_ns()

    # No dispatcher, we need the return value
    return _ffi.lib.glean_timing_distribution_set_start(self._handle, start_time)
def stop_and_accumulate(self, timer_id: Optional[int]) ‑> None

Stop tracking time for the provided metric and associated timer id. Add a count to the corresponding bucket in the timing distribution. This will record an error if no start was called.

Args

timer_id
The timer id associated with this timing. This allows for concurrent timing of events associated with different ids to the same timespan metric.
Expand source code
def stop_and_accumulate(self, timer_id: Optional[int]) -> None:
    """
    Stop tracking time for the provided metric and associated timer id. Add a
    count to the corresponding bucket in the timing distribution.
    This will record an error if no `start` was called.

    Args:
        timer_id: The timer id associated with this timing. This allows for
            concurrent timing of events associated with different ids to
            the same timespan metric.
    """
    # `start` may have returned None.
    # Accepting that means users of this API don't need to do a None check.
    if self._disabled or timer_id is None:
        return

    # The Rust code runs async and might be delayed. We need the time as
    # precisely as possible. We also need the same clock for start and stop
    # (`start` takes the time on the Python side).
    stop_time = _util.time_ns()

    @Dispatcher.launch
    def stop_and_accumulate():
        # CFFI prevents us from passing a negative value to
        # `glean_timing_distribution_set_stop_and_accumulate`. However, to be
        # consistent with the other platforms, we should still pass an invalid
        # value to the Rust side so it can record an error using the Glean
        # error reporting system.
        if timer_id < 0:
            # 64-bit maxint
            corrected_timer_id = 0xFFFFFFFFFFFFFFFF
        else:
            corrected_timer_id = timer_id

        _ffi.lib.glean_timing_distribution_set_stop_and_accumulate(
            self._handle, corrected_timer_id, stop_time
        )
def test_get_num_recorded_errors(self, error_type: ErrorType, ping_name: Optional[str] = None) ‑> int

Returns the number of errors recorded for the given metric.

Args

error_type : ErrorType
The type of error recorded.
ping_name : str
(default: first value in send_in_pings) The name of the ping to retrieve the metric for.

Returns

num_errors (int): The number of errors recorded for the metric for the given error type.

Expand source code
def test_get_num_recorded_errors(
    self, error_type: ErrorType, ping_name: Optional[str] = None
) -> int:
    """
    Returns the number of errors recorded for the given metric.

    Args:
        error_type (ErrorType): The type of error recorded.
        ping_name (str): (default: first value in send_in_pings) The name
            of the ping to retrieve the metric for.

    Returns:
        num_errors (int): The number of errors recorded for the metric for
            the given error type.
    """
    if ping_name is None:
        ping_name = self._send_in_pings[0]

    return _ffi.lib.glean_timing_distribution_test_get_num_recorded_errors(
        self._handle,
        error_type.value,
        _ffi.ffi_encode_string(ping_name),
    )
def test_get_value(self, ping_name: Optional[str] = None) ‑> DistributionData

Returns the stored value for testing purposes only.

Args

ping_name : str
(default: first value in send_in_pings) The name of the ping to retrieve the metric for.

Returns

value (DistriubutionData): value of the stored metric.

Expand source code
def test_get_value(self, ping_name: Optional[str] = None) -> DistributionData:
    """
    Returns the stored value for testing purposes only.

    Args:
        ping_name (str): (default: first value in send_in_pings) The name
            of the ping to retrieve the metric for.

    Returns:
        value (DistriubutionData): value of the stored metric.
    """
    if ping_name is None:
        ping_name = self._send_in_pings[0]

    if not self.test_has_value(ping_name):
        raise ValueError("metric has no value")

    return DistributionData.from_json_string(
        _ffi.ffi_decode_string(
            _ffi.lib.glean_timing_distribution_test_get_value_as_json_string(
                self._handle, _ffi.ffi_encode_string(ping_name)
            )
        )
    )
def test_has_value(self, ping_name: Optional[str] = None) ‑> bool

Tests whether a value is stored for the metric for testing purposes only.

Args

ping_name : str
(default: first value in send_in_pings) The name of the ping to retrieve the metric for.

Returns

has_value (bool): True if the metric value exists.

Expand source code
def test_has_value(self, ping_name: Optional[str] = None) -> bool:
    """
    Tests whether a value is stored for the metric for testing purposes
    only.

    Args:
        ping_name (str): (default: first value in send_in_pings) The name
            of the ping to retrieve the metric for.

    Returns:
        has_value (bool): True if the metric value exists.
    """
    if ping_name is None:
        ping_name = self._send_in_pings[0]

    return bool(
        _ffi.lib.glean_timing_distribution_test_has_value(
            self._handle, _ffi.ffi_encode_string(ping_name)
        )
    )