generator.metrics_utils

Utils for working with metric-hub.

 1"""Utils for working with metric-hub."""
 2
 3from typing import List, Optional
 4
 5from metric_config_parser.config import ConfigCollection
 6from metric_config_parser.metric import MetricDefinition
 7
 8METRIC_HUB_REPO = "https://github.com/mozilla/metric-hub"
 9LOOKER_METRIC_HUB_REPO = "https://github.com/mozilla/metric-hub/tree/main/looker"
10
11
12class _MetricsConfigLoader:
13    """Loads metric config files from an external repository."""
14
15    config_collection: Optional[ConfigCollection] = None
16    repos: List[str] = [METRIC_HUB_REPO, LOOKER_METRIC_HUB_REPO]
17
18    @property
19    def configs(self) -> ConfigCollection:
20        configs = getattr(self, "_configs", None)
21        if configs:
22            return configs
23
24        if self.config_collection is None:
25            self.config_collection = ConfigCollection.from_github_repos(self.repos)
26        self._configs = self.config_collection
27        return self._configs
28
29    def update_repos(self, repos: List[str]):
30        """Change the repos to load configs from."""
31        self.repos = repos
32        self.config_collection = None
33
34    def metrics_of_data_source(
35        self, data_source: str, namespace: str
36    ) -> List[MetricDefinition]:
37        """Get the metric definitions that use a specific data source."""
38        metrics = []
39        for definition in self.configs.definitions:
40            if definition.platform == namespace:
41                for _, metric_definition in definition.spec.metrics.definitions.items():
42                    if (
43                        metric_definition.data_source
44                        and metric_definition.data_source.name == data_source
45                    ):
46                        metrics.append(metric_definition)
47
48        return metrics
49
50    def data_sources_of_namespace(self, namespace: str) -> List[str]:
51        """
52        Get the data source slugs in the specified namespace.
53
54        Filter out data sources that are unused.
55        """
56        data_sources = []
57        for definition in self.configs.definitions:
58            for data_source_slug in definition.spec.data_sources.definitions.keys():
59                if (
60                    definition.platform == namespace
61                    and len(
62                        MetricsConfigLoader.metrics_of_data_source(
63                            data_source_slug, definition.platform
64                        )
65                    )
66                    > 0
67                ):
68                    data_sources.append(data_source_slug)
69
70        return data_sources
71
72
73MetricsConfigLoader = _MetricsConfigLoader()
METRIC_HUB_REPO = 'https://github.com/mozilla/metric-hub'
LOOKER_METRIC_HUB_REPO = 'https://github.com/mozilla/metric-hub/tree/main/looker'
MetricsConfigLoader = <generator.metrics_utils._MetricsConfigLoader object>