generator.explores.explore

Generic explore type.

  1"""Generic explore type."""
  2
  3from __future__ import annotations
  4
  5from dataclasses import dataclass, field
  6from pathlib import Path
  7from typing import Any, Dict, List, Optional, Tuple
  8
  9import lkml
 10
 11from ..views.lookml_utils import escape_filter_expr, slug_to_title
 12
 13
 14@dataclass
 15class Explore:
 16    """A generic explore."""
 17
 18    name: str
 19    views: Dict[str, str]
 20    views_path: Optional[Path] = None
 21    defn: Optional[Dict[str, str]] = None
 22    type: str = field(init=False)
 23
 24    def to_dict(self) -> dict:
 25        """Explore instance represented as a dict."""
 26        return {self.name: {"type": self.type, "views": self.views}}
 27
 28    def to_lookml(
 29        self, v1_name: Optional[str], hidden: Optional[bool]
 30    ) -> List[Dict[str, Any]]:
 31        """
 32        Generate LookML for this explore.
 33
 34        Any generation done in dependent explore's
 35        `_to_lookml` takes precedence over these fields.
 36        """
 37        base_lookml = {}
 38        if hidden:
 39            base_lookml["hidden"] = "yes"
 40        base_view_name = next(
 41            (
 42                view_name
 43                for view_type, view_name in self.views.items()
 44                if view_type == "base_view"
 45            )
 46        )
 47        for view_type, view in self.views.items():
 48            # We look at our dependent views to see if they have a
 49            # "submission" field. Dependent views are any that are:
 50            # - base_view
 51            # - extended_view*
 52            #
 53            # We do not want to look at joined views. Those should be
 54            # labeled as:
 55            # - join*
 56            #
 57            # If they have a submission field, we filter on the date.
 58            # This allows for filter queries to succeed.
 59            if "join" in view_type:
 60                continue
 61            if time_partitioning_group := self.get_view_time_partitioning_group(view):
 62                base_lookml["sql_always_where"] = (
 63                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
 64                )
 65
 66        # We only update the first returned explore
 67        new_lookml = self._to_lookml(v1_name)
 68        base_lookml.update(new_lookml[0])
 69        new_lookml[0] = base_lookml
 70
 71        return new_lookml
 72
 73    def _to_lookml(
 74        self,
 75        v1_name: Optional[str],
 76    ) -> List[Dict[str, Any]]:
 77        raise NotImplementedError("Only implemented in subclasses")
 78
 79    def get_dependent_views(self) -> List[str]:
 80        """Get views this explore is dependent on."""
 81        dependent_views = []
 82        for _type, views in self.views.items():
 83            if _type.startswith("extended"):
 84                continue
 85            elif _type.startswith("joined"):
 86                dependent_views += [view for view in views]
 87            else:
 88                dependent_views.append(views)
 89        return dependent_views
 90
 91    @staticmethod
 92    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
 93        """Get an instance of an explore from a namespace definition."""
 94        raise NotImplementedError("Only implemented in subclasses")
 95
 96    def get_view_lookml(self, view: str) -> dict:
 97        """Get the LookML for a view."""
 98        if self.views_path is not None:
 99            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
100        raise Exception("Missing view path for get_view_lookml")
101
102    def get_unnested_fields_joins_lookml(
103        self,
104    ) -> list:
105        """Get the LookML for joining unnested fields."""
106        views_lookml = self.get_view_lookml(self.views["base_view"])
107        views: List[str] = [view["name"] for view in views_lookml["views"]]
108        parent_base_name = views_lookml["views"][0]["name"]
109
110        extended_views: List[str] = []
111        if "extended_view" in self.views:
112            # check for extended views
113            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
114            extended_views = [view["name"] for view in extended_views_lookml["views"]]
115
116            views_lookml.update(extended_views_lookml)
117            views += extended_views
118
119        joins = []
120        for view in views_lookml["views"][1:]:
121            view_name = view["name"]
122            # get repeated, nested fields that exist as separate views in lookml
123            base_name, metric = self._get_base_name_and_metric(
124                view_name=view_name, views=views
125            )
126            metric_name = view_name
127            metric_label = slug_to_title(metric_name)
128
129            if view_name in extended_views:
130                # names of extended views are overriden by the name of the view that is extending them
131                metric_label = slug_to_title(
132                    metric_name.replace(base_name, parent_base_name)
133                )
134                base_name = parent_base_name
135
136            joins.append(
137                {
138                    "name": view_name,
139                    "view_label": metric_label,
140                    "relationship": "one_to_many",
141                    "sql": (
142                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
143                    ),
144                }
145            )
146
147        return joins
148
149    def _get_default_channel(self, view: str) -> Optional[str]:
150        channel_params = [
151            param
152            for _view_defn in self.get_view_lookml(view)["views"]
153            for param in _view_defn.get("filters", [])
154            if _view_defn["name"] == view and param["name"] == "channel"
155        ]
156
157        if channel_params:
158            allowed_values = channel_params[0]["suggestions"]
159            default_value = allowed_values[0]
160            return escape_filter_expr(default_value)
161        return None
162
163    def _get_base_name_and_metric(
164        self, view_name: str, views: List[str]
165    ) -> Tuple[str, str]:
166        """
167        Get base view and metric names.
168
169        Returns the the name of the base view and the metric based on the
170        passed `view_name` and existing views.
171
172        The names are resolved in a backwards fashion to account for
173        repeated nested fields that might contain other nested fields.
174        For example:
175
176        view: sync {
177            [...]
178            dimension: payload__events {
179                sql: ${TABLE}.payload.events ;;
180            }
181        }
182
183        view: sync__payload__events {
184            [...]
185            dimension: f5_ {
186                sql: ${TABLE}.f5_ ;;
187            }
188        }
189
190        view: sync__payload__events__f5_ {
191            [...]
192        }
193
194        For these nested views to get translated to the following joins, the names
195        need to be resolved backwards:
196
197        join: sync__payload__events {
198            relationship: one_to_many
199            sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;;
200        }
201
202        join: sync__payload__events__f5_ {
203            relationship: one_to_many
204            sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;;
205        }
206        """
207        split = view_name.split("__")
208        for index in range(len(split) - 1, 0, -1):
209            base_view = "__".join(split[:index])
210            metric = "__".join(split[index:])
211            if base_view in views:
212                return (base_view, metric)
213        raise Exception(f"Cannot get base name and metric from view {view_name}")
214
215    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
216        """Determine whether a this view has this dimension."""
217        for _view_defn in self.get_view_lookml(view)["views"]:
218            if _view_defn["name"] != view:
219                continue
220            for dim in _view_defn.get("dimensions", []):
221                if dim["name"] == dimension_name:
222                    return True
223        return False
224
225    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
226        """Get time partitiong dimension group for this view.
227
228        Return the name of the first dimension group tagged "time_partitioning_field",
229        and fall back to "submission" if available.
230        """
231        has_submission = False
232        for _view_defn in self.get_view_lookml(view)["views"]:
233            if not _view_defn["name"] == view:
234                continue
235            for dim in _view_defn.get("dimension_groups", []):
236                if "time_partitioning_field" in dim.get("tags", []):
237                    return dim["name"]
238                elif dim["name"] == "submission":
239                    has_submission = True
240        if has_submission:
241            return "submission"
242        return None
243
244    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
245        """Get required filters for this view."""
246        filters = []
247        view = self.views[view_name]
248
249        # Add a default filter on channel, if it's present in the view
250        default_channel = self._get_default_channel(view)
251        if default_channel is not None:
252            filters.append({"channel": default_channel})
253
254        # Add submission filter, if present in the view
255        if time_partitioning_group := self.get_view_time_partitioning_group(view):
256            filters.append({f"{time_partitioning_group}_date": "28 days"})
257
258        return filters
259
260    def __eq__(self, other) -> bool:
261        """Check for equality with other View."""
262
263        def comparable_dict(d):
264            return tuple(sorted(d.items()))
265
266        if isinstance(other, Explore):
267            return (
268                self.name == other.name
269                and comparable_dict(self.views) == comparable_dict(other.views)
270                and self.type == other.type
271            )
272        return False
@dataclass
class Explore:
 15@dataclass
 16class Explore:
 17    """A generic explore."""
 18
 19    name: str
 20    views: Dict[str, str]
 21    views_path: Optional[Path] = None
 22    defn: Optional[Dict[str, str]] = None
 23    type: str = field(init=False)
 24
 25    def to_dict(self) -> dict:
 26        """Explore instance represented as a dict."""
 27        return {self.name: {"type": self.type, "views": self.views}}
 28
 29    def to_lookml(
 30        self, v1_name: Optional[str], hidden: Optional[bool]
 31    ) -> List[Dict[str, Any]]:
 32        """
 33        Generate LookML for this explore.
 34
 35        Any generation done in dependent explore's
 36        `_to_lookml` takes precedence over these fields.
 37        """
 38        base_lookml = {}
 39        if hidden:
 40            base_lookml["hidden"] = "yes"
 41        base_view_name = next(
 42            (
 43                view_name
 44                for view_type, view_name in self.views.items()
 45                if view_type == "base_view"
 46            )
 47        )
 48        for view_type, view in self.views.items():
 49            # We look at our dependent views to see if they have a
 50            # "submission" field. Dependent views are any that are:
 51            # - base_view
 52            # - extended_view*
 53            #
 54            # We do not want to look at joined views. Those should be
 55            # labeled as:
 56            # - join*
 57            #
 58            # If they have a submission field, we filter on the date.
 59            # This allows for filter queries to succeed.
 60            if "join" in view_type:
 61                continue
 62            if time_partitioning_group := self.get_view_time_partitioning_group(view):
 63                base_lookml["sql_always_where"] = (
 64                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
 65                )
 66
 67        # We only update the first returned explore
 68        new_lookml = self._to_lookml(v1_name)
 69        base_lookml.update(new_lookml[0])
 70        new_lookml[0] = base_lookml
 71
 72        return new_lookml
 73
 74    def _to_lookml(
 75        self,
 76        v1_name: Optional[str],
 77    ) -> List[Dict[str, Any]]:
 78        raise NotImplementedError("Only implemented in subclasses")
 79
 80    def get_dependent_views(self) -> List[str]:
 81        """Get views this explore is dependent on."""
 82        dependent_views = []
 83        for _type, views in self.views.items():
 84            if _type.startswith("extended"):
 85                continue
 86            elif _type.startswith("joined"):
 87                dependent_views += [view for view in views]
 88            else:
 89                dependent_views.append(views)
 90        return dependent_views
 91
 92    @staticmethod
 93    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
 94        """Get an instance of an explore from a namespace definition."""
 95        raise NotImplementedError("Only implemented in subclasses")
 96
 97    def get_view_lookml(self, view: str) -> dict:
 98        """Get the LookML for a view."""
 99        if self.views_path is not None:
100            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
101        raise Exception("Missing view path for get_view_lookml")
102
103    def get_unnested_fields_joins_lookml(
104        self,
105    ) -> list:
106        """Get the LookML for joining unnested fields."""
107        views_lookml = self.get_view_lookml(self.views["base_view"])
108        views: List[str] = [view["name"] for view in views_lookml["views"]]
109        parent_base_name = views_lookml["views"][0]["name"]
110
111        extended_views: List[str] = []
112        if "extended_view" in self.views:
113            # check for extended views
114            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
115            extended_views = [view["name"] for view in extended_views_lookml["views"]]
116
117            views_lookml.update(extended_views_lookml)
118            views += extended_views
119
120        joins = []
121        for view in views_lookml["views"][1:]:
122            view_name = view["name"]
123            # get repeated, nested fields that exist as separate views in lookml
124            base_name, metric = self._get_base_name_and_metric(
125                view_name=view_name, views=views
126            )
127            metric_name = view_name
128            metric_label = slug_to_title(metric_name)
129
130            if view_name in extended_views:
131                # names of extended views are overriden by the name of the view that is extending them
132                metric_label = slug_to_title(
133                    metric_name.replace(base_name, parent_base_name)
134                )
135                base_name = parent_base_name
136
137            joins.append(
138                {
139                    "name": view_name,
140                    "view_label": metric_label,
141                    "relationship": "one_to_many",
142                    "sql": (
143                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
144                    ),
145                }
146            )
147
148        return joins
149
150    def _get_default_channel(self, view: str) -> Optional[str]:
151        channel_params = [
152            param
153            for _view_defn in self.get_view_lookml(view)["views"]
154            for param in _view_defn.get("filters", [])
155            if _view_defn["name"] == view and param["name"] == "channel"
156        ]
157
158        if channel_params:
159            allowed_values = channel_params[0]["suggestions"]
160            default_value = allowed_values[0]
161            return escape_filter_expr(default_value)
162        return None
163
164    def _get_base_name_and_metric(
165        self, view_name: str, views: List[str]
166    ) -> Tuple[str, str]:
167        """
168        Get base view and metric names.
169
170        Returns the the name of the base view and the metric based on the
171        passed `view_name` and existing views.
172
173        The names are resolved in a backwards fashion to account for
174        repeated nested fields that might contain other nested fields.
175        For example:
176
177        view: sync {
178            [...]
179            dimension: payload__events {
180                sql: ${TABLE}.payload.events ;;
181            }
182        }
183
184        view: sync__payload__events {
185            [...]
186            dimension: f5_ {
187                sql: ${TABLE}.f5_ ;;
188            }
189        }
190
191        view: sync__payload__events__f5_ {
192            [...]
193        }
194
195        For these nested views to get translated to the following joins, the names
196        need to be resolved backwards:
197
198        join: sync__payload__events {
199            relationship: one_to_many
200            sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;;
201        }
202
203        join: sync__payload__events__f5_ {
204            relationship: one_to_many
205            sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;;
206        }
207        """
208        split = view_name.split("__")
209        for index in range(len(split) - 1, 0, -1):
210            base_view = "__".join(split[:index])
211            metric = "__".join(split[index:])
212            if base_view in views:
213                return (base_view, metric)
214        raise Exception(f"Cannot get base name and metric from view {view_name}")
215
216    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
217        """Determine whether a this view has this dimension."""
218        for _view_defn in self.get_view_lookml(view)["views"]:
219            if _view_defn["name"] != view:
220                continue
221            for dim in _view_defn.get("dimensions", []):
222                if dim["name"] == dimension_name:
223                    return True
224        return False
225
226    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
227        """Get time partitiong dimension group for this view.
228
229        Return the name of the first dimension group tagged "time_partitioning_field",
230        and fall back to "submission" if available.
231        """
232        has_submission = False
233        for _view_defn in self.get_view_lookml(view)["views"]:
234            if not _view_defn["name"] == view:
235                continue
236            for dim in _view_defn.get("dimension_groups", []):
237                if "time_partitioning_field" in dim.get("tags", []):
238                    return dim["name"]
239                elif dim["name"] == "submission":
240                    has_submission = True
241        if has_submission:
242            return "submission"
243        return None
244
245    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
246        """Get required filters for this view."""
247        filters = []
248        view = self.views[view_name]
249
250        # Add a default filter on channel, if it's present in the view
251        default_channel = self._get_default_channel(view)
252        if default_channel is not None:
253            filters.append({"channel": default_channel})
254
255        # Add submission filter, if present in the view
256        if time_partitioning_group := self.get_view_time_partitioning_group(view):
257            filters.append({f"{time_partitioning_group}_date": "28 days"})
258
259        return filters
260
261    def __eq__(self, other) -> bool:
262        """Check for equality with other View."""
263
264        def comparable_dict(d):
265            return tuple(sorted(d.items()))
266
267        if isinstance(other, Explore):
268            return (
269                self.name == other.name
270                and comparable_dict(self.views) == comparable_dict(other.views)
271                and self.type == other.type
272            )
273        return False

A generic explore.

Explore( name: str, views: Dict[str, str], views_path: Optional[pathlib.Path] = None, defn: Optional[Dict[str, str]] = None)
name: str
views: Dict[str, str]
views_path: Optional[pathlib.Path] = None
defn: Optional[Dict[str, str]] = None
type: str
def to_dict(self) -> dict:
25    def to_dict(self) -> dict:
26        """Explore instance represented as a dict."""
27        return {self.name: {"type": self.type, "views": self.views}}

Explore instance represented as a dict.

def to_lookml( self, v1_name: Optional[str], hidden: Optional[bool]) -> List[Dict[str, Any]]:
29    def to_lookml(
30        self, v1_name: Optional[str], hidden: Optional[bool]
31    ) -> List[Dict[str, Any]]:
32        """
33        Generate LookML for this explore.
34
35        Any generation done in dependent explore's
36        `_to_lookml` takes precedence over these fields.
37        """
38        base_lookml = {}
39        if hidden:
40            base_lookml["hidden"] = "yes"
41        base_view_name = next(
42            (
43                view_name
44                for view_type, view_name in self.views.items()
45                if view_type == "base_view"
46            )
47        )
48        for view_type, view in self.views.items():
49            # We look at our dependent views to see if they have a
50            # "submission" field. Dependent views are any that are:
51            # - base_view
52            # - extended_view*
53            #
54            # We do not want to look at joined views. Those should be
55            # labeled as:
56            # - join*
57            #
58            # If they have a submission field, we filter on the date.
59            # This allows for filter queries to succeed.
60            if "join" in view_type:
61                continue
62            if time_partitioning_group := self.get_view_time_partitioning_group(view):
63                base_lookml["sql_always_where"] = (
64                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
65                )
66
67        # We only update the first returned explore
68        new_lookml = self._to_lookml(v1_name)
69        base_lookml.update(new_lookml[0])
70        new_lookml[0] = base_lookml
71
72        return new_lookml

Generate LookML for this explore.

Any generation done in dependent explore's _to_lookml takes precedence over these fields.

def get_dependent_views(self) -> List[str]:
80    def get_dependent_views(self) -> List[str]:
81        """Get views this explore is dependent on."""
82        dependent_views = []
83        for _type, views in self.views.items():
84            if _type.startswith("extended"):
85                continue
86            elif _type.startswith("joined"):
87                dependent_views += [view for view in views]
88            else:
89                dependent_views.append(views)
90        return dependent_views

Get views this explore is dependent on.

@staticmethod
def from_dict( name: str, defn: dict, views_path: pathlib.Path) -> Explore:
92    @staticmethod
93    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
94        """Get an instance of an explore from a namespace definition."""
95        raise NotImplementedError("Only implemented in subclasses")

Get an instance of an explore from a namespace definition.

def get_view_lookml(self, view: str) -> dict:
 97    def get_view_lookml(self, view: str) -> dict:
 98        """Get the LookML for a view."""
 99        if self.views_path is not None:
100            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
101        raise Exception("Missing view path for get_view_lookml")

Get the LookML for a view.

def get_unnested_fields_joins_lookml(self) -> list:
103    def get_unnested_fields_joins_lookml(
104        self,
105    ) -> list:
106        """Get the LookML for joining unnested fields."""
107        views_lookml = self.get_view_lookml(self.views["base_view"])
108        views: List[str] = [view["name"] for view in views_lookml["views"]]
109        parent_base_name = views_lookml["views"][0]["name"]
110
111        extended_views: List[str] = []
112        if "extended_view" in self.views:
113            # check for extended views
114            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
115            extended_views = [view["name"] for view in extended_views_lookml["views"]]
116
117            views_lookml.update(extended_views_lookml)
118            views += extended_views
119
120        joins = []
121        for view in views_lookml["views"][1:]:
122            view_name = view["name"]
123            # get repeated, nested fields that exist as separate views in lookml
124            base_name, metric = self._get_base_name_and_metric(
125                view_name=view_name, views=views
126            )
127            metric_name = view_name
128            metric_label = slug_to_title(metric_name)
129
130            if view_name in extended_views:
131                # names of extended views are overriden by the name of the view that is extending them
132                metric_label = slug_to_title(
133                    metric_name.replace(base_name, parent_base_name)
134                )
135                base_name = parent_base_name
136
137            joins.append(
138                {
139                    "name": view_name,
140                    "view_label": metric_label,
141                    "relationship": "one_to_many",
142                    "sql": (
143                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
144                    ),
145                }
146            )
147
148        return joins

Get the LookML for joining unnested fields.

def has_view_dimension(self, view: str, dimension_name: str) -> bool:
216    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
217        """Determine whether a this view has this dimension."""
218        for _view_defn in self.get_view_lookml(view)["views"]:
219            if _view_defn["name"] != view:
220                continue
221            for dim in _view_defn.get("dimensions", []):
222                if dim["name"] == dimension_name:
223                    return True
224        return False

Determine whether a this view has this dimension.

def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
226    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
227        """Get time partitiong dimension group for this view.
228
229        Return the name of the first dimension group tagged "time_partitioning_field",
230        and fall back to "submission" if available.
231        """
232        has_submission = False
233        for _view_defn in self.get_view_lookml(view)["views"]:
234            if not _view_defn["name"] == view:
235                continue
236            for dim in _view_defn.get("dimension_groups", []):
237                if "time_partitioning_field" in dim.get("tags", []):
238                    return dim["name"]
239                elif dim["name"] == "submission":
240                    has_submission = True
241        if has_submission:
242            return "submission"
243        return None

Get time partitiong dimension group for this view.

Return the name of the first dimension group tagged "time_partitioning_field", and fall back to "submission" if available.

def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
245    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
246        """Get required filters for this view."""
247        filters = []
248        view = self.views[view_name]
249
250        # Add a default filter on channel, if it's present in the view
251        default_channel = self._get_default_channel(view)
252        if default_channel is not None:
253            filters.append({"channel": default_channel})
254
255        # Add submission filter, if present in the view
256        if time_partitioning_group := self.get_view_time_partitioning_group(view):
257            filters.append({f"{time_partitioning_group}_date": "28 days"})
258
259        return filters

Get required filters for this view.