generator.explores.explore

Generic explore type.

  1"""Generic explore type."""
  2
  3from __future__ import annotations
  4
  5from dataclasses import dataclass, field
  6from pathlib import Path
  7from typing import Any, Dict, List, Optional, Tuple
  8
  9import lkml
 10
 11from ..views.lookml_utils import escape_filter_expr, slug_to_title
 12
 13
 14@dataclass
 15class Explore:
 16    """A generic explore."""
 17
 18    name: str
 19    views: Dict[str, str]
 20    views_path: Optional[Path] = None
 21    defn: Optional[Dict[str, str]] = None
 22    type: str = field(init=False)
 23
 24    def to_dict(self) -> dict:
 25        """Explore instance represented as a dict."""
 26        return {self.name: {"type": self.type, "views": self.views}}
 27
 28    def to_lookml(
 29        self, v1_name: Optional[str], hidden: Optional[bool]
 30    ) -> List[Dict[str, Any]]:
 31        """
 32        Generate LookML for this explore.
 33
 34        Any generation done in dependent explore's
 35        `_to_lookml` takes precedence over these fields.
 36        """
 37        base_lookml = {}
 38        if hidden:
 39            base_lookml["hidden"] = "yes"
 40        base_view_name = next(
 41            (
 42                view_name
 43                for view_type, view_name in self.views.items()
 44                if view_type == "base_view"
 45            )
 46        )
 47        for view_type, view in self.views.items():
 48            # We look at our dependent views to see if they have a
 49            # "submission" field. Dependent views are any that are:
 50            # - base_view
 51            # - extended_view*
 52            #
 53            # We do not want to look at joined views. Those should be
 54            # labeled as:
 55            # - join*
 56            #
 57            # If they have a submission field, we filter on the date.
 58            # This allows for filter queries to succeed.
 59            if "join" in view_type:
 60                continue
 61            if time_partitioning_group := self.get_view_time_partitioning_group(view):
 62                base_lookml["sql_always_where"] = (
 63                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
 64                )
 65
 66        # We only update the first returned explore
 67        new_lookml = self._to_lookml(v1_name)
 68        base_lookml.update(new_lookml[0])
 69        new_lookml[0] = base_lookml
 70
 71        return new_lookml
 72
 73    def _to_lookml(
 74        self,
 75        v1_name: Optional[str],
 76    ) -> List[Dict[str, Any]]:
 77        raise NotImplementedError("Only implemented in subclasses")
 78
 79    def get_dependent_views(self) -> List[str]:
 80        """Get views this explore is dependent on."""
 81        dependent_views = []
 82        for _type, views in self.views.items():
 83            if _type.startswith("extended"):
 84                continue
 85            elif _type.startswith("joined"):
 86                dependent_views += [view for view in views]
 87            else:
 88                dependent_views.append(views)
 89        return dependent_views
 90
 91    @staticmethod
 92    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
 93        """Get an instance of an explore from a namespace definition."""
 94        raise NotImplementedError("Only implemented in subclasses")
 95
 96    def get_view_lookml(self, view: str) -> dict:
 97        """Get the LookML for a view."""
 98        if self.views_path is not None:
 99            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
100        raise Exception("Missing view path for get_view_lookml")
101
102    def get_datagroup(self) -> Optional[str]:
103        """
104        Return the name of the associated datagroup.
105
106        Return `None` if there is no datagroup for this explore.
107        """
108        return None
109
110    def get_unnested_fields_joins_lookml(
111        self,
112    ) -> list:
113        """Get the LookML for joining unnested fields."""
114        views_lookml = self.get_view_lookml(self.views["base_view"])
115        views: List[str] = [view["name"] for view in views_lookml["views"]]
116        parent_base_name = views_lookml["views"][0]["name"]
117
118        extended_views: List[str] = []
119        if "extended_view" in self.views:
120            # check for extended views
121            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
122            extended_views = [view["name"] for view in extended_views_lookml["views"]]
123
124            views_lookml.update(extended_views_lookml)
125            views += extended_views
126
127        joins = []
128        for view in views_lookml["views"][1:]:
129            view_name = view["name"]
130            # get repeated, nested fields that exist as separate views in lookml
131            base_name, metric = self._get_base_name_and_metric(
132                view_name=view_name, views=views
133            )
134            metric_name = view_name
135            metric_label = slug_to_title(metric_name)
136
137            if view_name in extended_views:
138                # names of extended views are overriden by the name of the view that is extending them
139                metric_label = slug_to_title(
140                    metric_name.replace(base_name, parent_base_name)
141                )
142                base_name = parent_base_name
143
144            joins.append(
145                {
146                    "name": view_name,
147                    "view_label": metric_label,
148                    "relationship": "one_to_many",
149                    "sql": (
150                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
151                    ),
152                }
153            )
154
155        return joins
156
157    def _get_default_channel(self, view: str) -> Optional[str]:
158        channel_params = [
159            param
160            for _view_defn in self.get_view_lookml(view)["views"]
161            for param in _view_defn.get("filters", [])
162            if _view_defn["name"] == view and param["name"] == "channel"
163        ]
164
165        if channel_params:
166            allowed_values = channel_params[0]["suggestions"]
167            default_value = allowed_values[0]
168            return escape_filter_expr(default_value)
169        return None
170
171    def _get_base_name_and_metric(
172        self, view_name: str, views: List[str]
173    ) -> Tuple[str, str]:
174        """
175        Get base view and metric names.
176
177        Returns the the name of the base view and the metric based on the
178        passed `view_name` and existing views.
179
180        The names are resolved in a backwards fashion to account for
181        repeated nested fields that might contain other nested fields.
182        For example:
183
184        view: sync {
185            [...]
186            dimension: payload__events {
187                sql: ${TABLE}.payload.events ;;
188            }
189        }
190
191        view: sync__payload__events {
192            [...]
193            dimension: f5_ {
194                sql: ${TABLE}.f5_ ;;
195            }
196        }
197
198        view: sync__payload__events__f5_ {
199            [...]
200        }
201
202        For these nested views to get translated to the following joins, the names
203        need to be resolved backwards:
204
205        join: sync__payload__events {
206            relationship: one_to_many
207            sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;;
208        }
209
210        join: sync__payload__events__f5_ {
211            relationship: one_to_many
212            sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;;
213        }
214        """
215        split = view_name.split("__")
216        for index in range(len(split) - 1, 0, -1):
217            base_view = "__".join(split[:index])
218            metric = "__".join(split[index:])
219            if base_view in views:
220                return (base_view, metric)
221        raise Exception(f"Cannot get base name and metric from view {view_name}")
222
223    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
224        """Determine whether a this view has this dimension."""
225        for _view_defn in self.get_view_lookml(view)["views"]:
226            if _view_defn["name"] != view:
227                continue
228            for dim in _view_defn.get("dimensions", []):
229                if dim["name"] == dimension_name:
230                    return True
231        return False
232
233    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
234        """Get time partitiong dimension group for this view.
235
236        Return the name of the first dimension group tagged "time_partitioning_field",
237        and fall back to "submission" if available.
238        """
239        has_submission = False
240        for _view_defn in self.get_view_lookml(view)["views"]:
241            if not _view_defn["name"] == view:
242                continue
243            for dim in _view_defn.get("dimension_groups", []):
244                if "time_partitioning_field" in dim.get("tags", []):
245                    return dim["name"]
246                elif dim["name"] == "submission":
247                    has_submission = True
248        if has_submission:
249            return "submission"
250        return None
251
252    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
253        """Get required filters for this view."""
254        filters = []
255        view = self.views[view_name]
256
257        # Add a default filter on channel, if it's present in the view
258        default_channel = self._get_default_channel(view)
259        if default_channel is not None:
260            filters.append({"channel": default_channel})
261
262        # Add submission filter, if present in the view
263        if time_partitioning_group := self.get_view_time_partitioning_group(view):
264            filters.append({f"{time_partitioning_group}_date": "28 days"})
265
266        return filters
267
268    def __eq__(self, other) -> bool:
269        """Check for equality with other View."""
270
271        def comparable_dict(d):
272            return tuple(sorted(d.items()))
273
274        if isinstance(other, Explore):
275            return (
276                self.name == other.name
277                and comparable_dict(self.views) == comparable_dict(other.views)
278                and self.type == other.type
279            )
280        return False
@dataclass
class Explore:
 15@dataclass
 16class Explore:
 17    """A generic explore."""
 18
 19    name: str
 20    views: Dict[str, str]
 21    views_path: Optional[Path] = None
 22    defn: Optional[Dict[str, str]] = None
 23    type: str = field(init=False)
 24
 25    def to_dict(self) -> dict:
 26        """Explore instance represented as a dict."""
 27        return {self.name: {"type": self.type, "views": self.views}}
 28
 29    def to_lookml(
 30        self, v1_name: Optional[str], hidden: Optional[bool]
 31    ) -> List[Dict[str, Any]]:
 32        """
 33        Generate LookML for this explore.
 34
 35        Any generation done in dependent explore's
 36        `_to_lookml` takes precedence over these fields.
 37        """
 38        base_lookml = {}
 39        if hidden:
 40            base_lookml["hidden"] = "yes"
 41        base_view_name = next(
 42            (
 43                view_name
 44                for view_type, view_name in self.views.items()
 45                if view_type == "base_view"
 46            )
 47        )
 48        for view_type, view in self.views.items():
 49            # We look at our dependent views to see if they have a
 50            # "submission" field. Dependent views are any that are:
 51            # - base_view
 52            # - extended_view*
 53            #
 54            # We do not want to look at joined views. Those should be
 55            # labeled as:
 56            # - join*
 57            #
 58            # If they have a submission field, we filter on the date.
 59            # This allows for filter queries to succeed.
 60            if "join" in view_type:
 61                continue
 62            if time_partitioning_group := self.get_view_time_partitioning_group(view):
 63                base_lookml["sql_always_where"] = (
 64                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
 65                )
 66
 67        # We only update the first returned explore
 68        new_lookml = self._to_lookml(v1_name)
 69        base_lookml.update(new_lookml[0])
 70        new_lookml[0] = base_lookml
 71
 72        return new_lookml
 73
 74    def _to_lookml(
 75        self,
 76        v1_name: Optional[str],
 77    ) -> List[Dict[str, Any]]:
 78        raise NotImplementedError("Only implemented in subclasses")
 79
 80    def get_dependent_views(self) -> List[str]:
 81        """Get views this explore is dependent on."""
 82        dependent_views = []
 83        for _type, views in self.views.items():
 84            if _type.startswith("extended"):
 85                continue
 86            elif _type.startswith("joined"):
 87                dependent_views += [view for view in views]
 88            else:
 89                dependent_views.append(views)
 90        return dependent_views
 91
 92    @staticmethod
 93    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
 94        """Get an instance of an explore from a namespace definition."""
 95        raise NotImplementedError("Only implemented in subclasses")
 96
 97    def get_view_lookml(self, view: str) -> dict:
 98        """Get the LookML for a view."""
 99        if self.views_path is not None:
100            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
101        raise Exception("Missing view path for get_view_lookml")
102
103    def get_datagroup(self) -> Optional[str]:
104        """
105        Return the name of the associated datagroup.
106
107        Return `None` if there is no datagroup for this explore.
108        """
109        return None
110
111    def get_unnested_fields_joins_lookml(
112        self,
113    ) -> list:
114        """Get the LookML for joining unnested fields."""
115        views_lookml = self.get_view_lookml(self.views["base_view"])
116        views: List[str] = [view["name"] for view in views_lookml["views"]]
117        parent_base_name = views_lookml["views"][0]["name"]
118
119        extended_views: List[str] = []
120        if "extended_view" in self.views:
121            # check for extended views
122            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
123            extended_views = [view["name"] for view in extended_views_lookml["views"]]
124
125            views_lookml.update(extended_views_lookml)
126            views += extended_views
127
128        joins = []
129        for view in views_lookml["views"][1:]:
130            view_name = view["name"]
131            # get repeated, nested fields that exist as separate views in lookml
132            base_name, metric = self._get_base_name_and_metric(
133                view_name=view_name, views=views
134            )
135            metric_name = view_name
136            metric_label = slug_to_title(metric_name)
137
138            if view_name in extended_views:
139                # names of extended views are overriden by the name of the view that is extending them
140                metric_label = slug_to_title(
141                    metric_name.replace(base_name, parent_base_name)
142                )
143                base_name = parent_base_name
144
145            joins.append(
146                {
147                    "name": view_name,
148                    "view_label": metric_label,
149                    "relationship": "one_to_many",
150                    "sql": (
151                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
152                    ),
153                }
154            )
155
156        return joins
157
158    def _get_default_channel(self, view: str) -> Optional[str]:
159        channel_params = [
160            param
161            for _view_defn in self.get_view_lookml(view)["views"]
162            for param in _view_defn.get("filters", [])
163            if _view_defn["name"] == view and param["name"] == "channel"
164        ]
165
166        if channel_params:
167            allowed_values = channel_params[0]["suggestions"]
168            default_value = allowed_values[0]
169            return escape_filter_expr(default_value)
170        return None
171
172    def _get_base_name_and_metric(
173        self, view_name: str, views: List[str]
174    ) -> Tuple[str, str]:
175        """
176        Get base view and metric names.
177
178        Returns the the name of the base view and the metric based on the
179        passed `view_name` and existing views.
180
181        The names are resolved in a backwards fashion to account for
182        repeated nested fields that might contain other nested fields.
183        For example:
184
185        view: sync {
186            [...]
187            dimension: payload__events {
188                sql: ${TABLE}.payload.events ;;
189            }
190        }
191
192        view: sync__payload__events {
193            [...]
194            dimension: f5_ {
195                sql: ${TABLE}.f5_ ;;
196            }
197        }
198
199        view: sync__payload__events__f5_ {
200            [...]
201        }
202
203        For these nested views to get translated to the following joins, the names
204        need to be resolved backwards:
205
206        join: sync__payload__events {
207            relationship: one_to_many
208            sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;;
209        }
210
211        join: sync__payload__events__f5_ {
212            relationship: one_to_many
213            sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;;
214        }
215        """
216        split = view_name.split("__")
217        for index in range(len(split) - 1, 0, -1):
218            base_view = "__".join(split[:index])
219            metric = "__".join(split[index:])
220            if base_view in views:
221                return (base_view, metric)
222        raise Exception(f"Cannot get base name and metric from view {view_name}")
223
224    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
225        """Determine whether a this view has this dimension."""
226        for _view_defn in self.get_view_lookml(view)["views"]:
227            if _view_defn["name"] != view:
228                continue
229            for dim in _view_defn.get("dimensions", []):
230                if dim["name"] == dimension_name:
231                    return True
232        return False
233
234    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
235        """Get time partitiong dimension group for this view.
236
237        Return the name of the first dimension group tagged "time_partitioning_field",
238        and fall back to "submission" if available.
239        """
240        has_submission = False
241        for _view_defn in self.get_view_lookml(view)["views"]:
242            if not _view_defn["name"] == view:
243                continue
244            for dim in _view_defn.get("dimension_groups", []):
245                if "time_partitioning_field" in dim.get("tags", []):
246                    return dim["name"]
247                elif dim["name"] == "submission":
248                    has_submission = True
249        if has_submission:
250            return "submission"
251        return None
252
253    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
254        """Get required filters for this view."""
255        filters = []
256        view = self.views[view_name]
257
258        # Add a default filter on channel, if it's present in the view
259        default_channel = self._get_default_channel(view)
260        if default_channel is not None:
261            filters.append({"channel": default_channel})
262
263        # Add submission filter, if present in the view
264        if time_partitioning_group := self.get_view_time_partitioning_group(view):
265            filters.append({f"{time_partitioning_group}_date": "28 days"})
266
267        return filters
268
269    def __eq__(self, other) -> bool:
270        """Check for equality with other View."""
271
272        def comparable_dict(d):
273            return tuple(sorted(d.items()))
274
275        if isinstance(other, Explore):
276            return (
277                self.name == other.name
278                and comparable_dict(self.views) == comparable_dict(other.views)
279                and self.type == other.type
280            )
281        return False

A generic explore.

Explore( name: str, views: Dict[str, str], views_path: Optional[pathlib.Path] = None, defn: Optional[Dict[str, str]] = None)
name: str
views: Dict[str, str]
views_path: Optional[pathlib.Path] = None
defn: Optional[Dict[str, str]] = None
type: str
def to_dict(self) -> dict:
25    def to_dict(self) -> dict:
26        """Explore instance represented as a dict."""
27        return {self.name: {"type": self.type, "views": self.views}}

Explore instance represented as a dict.

def to_lookml( self, v1_name: Optional[str], hidden: Optional[bool]) -> List[Dict[str, Any]]:
29    def to_lookml(
30        self, v1_name: Optional[str], hidden: Optional[bool]
31    ) -> List[Dict[str, Any]]:
32        """
33        Generate LookML for this explore.
34
35        Any generation done in dependent explore's
36        `_to_lookml` takes precedence over these fields.
37        """
38        base_lookml = {}
39        if hidden:
40            base_lookml["hidden"] = "yes"
41        base_view_name = next(
42            (
43                view_name
44                for view_type, view_name in self.views.items()
45                if view_type == "base_view"
46            )
47        )
48        for view_type, view in self.views.items():
49            # We look at our dependent views to see if they have a
50            # "submission" field. Dependent views are any that are:
51            # - base_view
52            # - extended_view*
53            #
54            # We do not want to look at joined views. Those should be
55            # labeled as:
56            # - join*
57            #
58            # If they have a submission field, we filter on the date.
59            # This allows for filter queries to succeed.
60            if "join" in view_type:
61                continue
62            if time_partitioning_group := self.get_view_time_partitioning_group(view):
63                base_lookml["sql_always_where"] = (
64                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
65                )
66
67        # We only update the first returned explore
68        new_lookml = self._to_lookml(v1_name)
69        base_lookml.update(new_lookml[0])
70        new_lookml[0] = base_lookml
71
72        return new_lookml

Generate LookML for this explore.

Any generation done in dependent explore's _to_lookml takes precedence over these fields.

def get_dependent_views(self) -> List[str]:
80    def get_dependent_views(self) -> List[str]:
81        """Get views this explore is dependent on."""
82        dependent_views = []
83        for _type, views in self.views.items():
84            if _type.startswith("extended"):
85                continue
86            elif _type.startswith("joined"):
87                dependent_views += [view for view in views]
88            else:
89                dependent_views.append(views)
90        return dependent_views

Get views this explore is dependent on.

@staticmethod
def from_dict( name: str, defn: dict, views_path: pathlib.Path) -> Explore:
92    @staticmethod
93    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
94        """Get an instance of an explore from a namespace definition."""
95        raise NotImplementedError("Only implemented in subclasses")

Get an instance of an explore from a namespace definition.

def get_view_lookml(self, view: str) -> dict:
 97    def get_view_lookml(self, view: str) -> dict:
 98        """Get the LookML for a view."""
 99        if self.views_path is not None:
100            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
101        raise Exception("Missing view path for get_view_lookml")

Get the LookML for a view.

def get_datagroup(self) -> Optional[str]:
103    def get_datagroup(self) -> Optional[str]:
104        """
105        Return the name of the associated datagroup.
106
107        Return `None` if there is no datagroup for this explore.
108        """
109        return None

Return the name of the associated datagroup.

Return None if there is no datagroup for this explore.

def get_unnested_fields_joins_lookml(self) -> list:
111    def get_unnested_fields_joins_lookml(
112        self,
113    ) -> list:
114        """Get the LookML for joining unnested fields."""
115        views_lookml = self.get_view_lookml(self.views["base_view"])
116        views: List[str] = [view["name"] for view in views_lookml["views"]]
117        parent_base_name = views_lookml["views"][0]["name"]
118
119        extended_views: List[str] = []
120        if "extended_view" in self.views:
121            # check for extended views
122            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
123            extended_views = [view["name"] for view in extended_views_lookml["views"]]
124
125            views_lookml.update(extended_views_lookml)
126            views += extended_views
127
128        joins = []
129        for view in views_lookml["views"][1:]:
130            view_name = view["name"]
131            # get repeated, nested fields that exist as separate views in lookml
132            base_name, metric = self._get_base_name_and_metric(
133                view_name=view_name, views=views
134            )
135            metric_name = view_name
136            metric_label = slug_to_title(metric_name)
137
138            if view_name in extended_views:
139                # names of extended views are overriden by the name of the view that is extending them
140                metric_label = slug_to_title(
141                    metric_name.replace(base_name, parent_base_name)
142                )
143                base_name = parent_base_name
144
145            joins.append(
146                {
147                    "name": view_name,
148                    "view_label": metric_label,
149                    "relationship": "one_to_many",
150                    "sql": (
151                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
152                    ),
153                }
154            )
155
156        return joins

Get the LookML for joining unnested fields.

def has_view_dimension(self, view: str, dimension_name: str) -> bool:
224    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
225        """Determine whether a this view has this dimension."""
226        for _view_defn in self.get_view_lookml(view)["views"]:
227            if _view_defn["name"] != view:
228                continue
229            for dim in _view_defn.get("dimensions", []):
230                if dim["name"] == dimension_name:
231                    return True
232        return False

Determine whether a this view has this dimension.

def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
234    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
235        """Get time partitiong dimension group for this view.
236
237        Return the name of the first dimension group tagged "time_partitioning_field",
238        and fall back to "submission" if available.
239        """
240        has_submission = False
241        for _view_defn in self.get_view_lookml(view)["views"]:
242            if not _view_defn["name"] == view:
243                continue
244            for dim in _view_defn.get("dimension_groups", []):
245                if "time_partitioning_field" in dim.get("tags", []):
246                    return dim["name"]
247                elif dim["name"] == "submission":
248                    has_submission = True
249        if has_submission:
250            return "submission"
251        return None

Get time partitiong dimension group for this view.

Return the name of the first dimension group tagged "time_partitioning_field", and fall back to "submission" if available.

def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
253    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
254        """Get required filters for this view."""
255        filters = []
256        view = self.views[view_name]
257
258        # Add a default filter on channel, if it's present in the view
259        default_channel = self._get_default_channel(view)
260        if default_channel is not None:
261            filters.append({"channel": default_channel})
262
263        # Add submission filter, if present in the view
264        if time_partitioning_group := self.get_view_time_partitioning_group(view):
265            filters.append({f"{time_partitioning_group}_date": "28 days"})
266
267        return filters

Get required filters for this view.