generator.explores.explore

Generic explore type.

  1"""Generic explore type."""
  2
  3from __future__ import annotations
  4
  5from dataclasses import dataclass, field
  6from pathlib import Path
  7from typing import Any, Dict, List, Optional, Tuple
  8
  9import lkml
 10
 11from ..views.lookml_utils import escape_filter_expr, slug_to_title
 12
 13
 14@dataclass
 15class Explore:
 16    """A generic explore."""
 17
 18    name: str
 19    views: Dict[str, str]
 20    views_path: Optional[Path] = None
 21    defn: Optional[Dict[str, str]] = None
 22    type: str = field(init=False)
 23
 24    def to_dict(self) -> dict:
 25        """Explore instance represented as a dict."""
 26        return {self.name: {"type": self.type, "views": self.views}}
 27
 28    def to_lookml(self, v1_name: Optional[str]) -> List[Dict[str, Any]]:
 29        """
 30        Generate LookML for this explore.
 31
 32        Any generation done in dependent explore's
 33        `_to_lookml` takes precedence over these fields.
 34        """
 35        base_lookml = {}
 36        base_view_name = next(
 37            (
 38                view_name
 39                for view_type, view_name in self.views.items()
 40                if view_type == "base_view"
 41            )
 42        )
 43        for view_type, view in self.views.items():
 44            # We look at our dependent views to see if they have a
 45            # "submission" field. Dependent views are any that are:
 46            # - base_view
 47            # - extended_view*
 48            #
 49            # We do not want to look at joined views. Those should be
 50            # labeled as:
 51            # - join*
 52            #
 53            # If they have a submission field, we filter on the date.
 54            # This allows for filter queries to succeed.
 55            if "join" in view_type:
 56                continue
 57            if time_partitioning_group := self.get_view_time_partitioning_group(view):
 58                base_lookml["sql_always_where"] = (
 59                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
 60                )
 61
 62        # We only update the first returned explore
 63        new_lookml = self._to_lookml(v1_name)
 64        base_lookml.update(new_lookml[0])
 65        new_lookml[0] = base_lookml
 66
 67        return new_lookml
 68
 69    def _to_lookml(
 70        self,
 71        v1_name: Optional[str],
 72    ) -> List[Dict[str, Any]]:
 73        raise NotImplementedError("Only implemented in subclasses")
 74
 75    def get_dependent_views(self) -> List[str]:
 76        """Get views this explore is dependent on."""
 77        dependent_views = []
 78        for _type, views in self.views.items():
 79            if _type.startswith("extended"):
 80                continue
 81            elif _type.startswith("joined"):
 82                dependent_views += [view for view in views]
 83            else:
 84                dependent_views.append(views)
 85        return dependent_views
 86
 87    @staticmethod
 88    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
 89        """Get an instance of an explore from a namespace definition."""
 90        raise NotImplementedError("Only implemented in subclasses")
 91
 92    def get_view_lookml(self, view: str) -> dict:
 93        """Get the LookML for a view."""
 94        if self.views_path is not None:
 95            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
 96        raise Exception("Missing view path for get_view_lookml")
 97
 98    def get_unnested_fields_joins_lookml(
 99        self,
100    ) -> list:
101        """Get the LookML for joining unnested fields."""
102        views_lookml = self.get_view_lookml(self.views["base_view"])
103        views: List[str] = [view["name"] for view in views_lookml["views"]]
104        parent_base_name = views_lookml["views"][0]["name"]
105
106        extended_views: List[str] = []
107        if "extended_view" in self.views:
108            # check for extended views
109            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
110            extended_views = [view["name"] for view in extended_views_lookml["views"]]
111
112            views_lookml.update(extended_views_lookml)
113            views += extended_views
114
115        joins = []
116        for view in views_lookml["views"][1:]:
117            view_name = view["name"]
118            # get repeated, nested fields that exist as separate views in lookml
119            base_name, metric = self._get_base_name_and_metric(
120                view_name=view_name, views=views
121            )
122            metric_name = view_name
123            metric_label = slug_to_title(metric_name)
124
125            if view_name in extended_views:
126                # names of extended views are overriden by the name of the view that is extending them
127                metric_label = slug_to_title(
128                    metric_name.replace(base_name, parent_base_name)
129                )
130                base_name = parent_base_name
131
132            joins.append(
133                {
134                    "name": view_name,
135                    "view_label": metric_label,
136                    "relationship": "one_to_many",
137                    "sql": (
138                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
139                    ),
140                }
141            )
142
143        return joins
144
145    def _get_default_channel(self, view: str) -> Optional[str]:
146        channel_params = [
147            param
148            for _view_defn in self.get_view_lookml(view)["views"]
149            for param in _view_defn.get("filters", [])
150            if _view_defn["name"] == view and param["name"] == "channel"
151        ]
152
153        if channel_params:
154            allowed_values = channel_params[0]["suggestions"]
155            default_value = allowed_values[0]
156            return escape_filter_expr(default_value)
157        return None
158
159    def _get_base_name_and_metric(
160        self, view_name: str, views: List[str]
161    ) -> Tuple[str, str]:
162        """
163        Get base view and metric names.
164
165        Returns the the name of the base view and the metric based on the
166        passed `view_name` and existing views.
167
168        The names are resolved in a backwards fashion to account for
169        repeated nested fields that might contain other nested fields.
170        For example:
171
172        view: sync {
173            [...]
174            dimension: payload__events {
175                sql: ${TABLE}.payload.events ;;
176            }
177        }
178
179        view: sync__payload__events {
180            [...]
181            dimension: f5_ {
182                sql: ${TABLE}.f5_ ;;
183            }
184        }
185
186        view: sync__payload__events__f5_ {
187            [...]
188        }
189
190        For these nested views to get translated to the following joins, the names
191        need to be resolved backwards:
192
193        join: sync__payload__events {
194            relationship: one_to_many
195            sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;;
196        }
197
198        join: sync__payload__events__f5_ {
199            relationship: one_to_many
200            sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;;
201        }
202        """
203        split = view_name.split("__")
204        for index in range(len(split) - 1, 0, -1):
205            base_view = "__".join(split[:index])
206            metric = "__".join(split[index:])
207            if base_view in views:
208                return (base_view, metric)
209        raise Exception(f"Cannot get base name and metric from view {view_name}")
210
211    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
212        """Determine whether a this view has this dimension."""
213        for _view_defn in self.get_view_lookml(view)["views"]:
214            if _view_defn["name"] != view:
215                continue
216            for dim in _view_defn.get("dimensions", []):
217                if dim["name"] == dimension_name:
218                    return True
219        return False
220
221    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
222        """Get time partitiong dimension group for this view.
223
224        Return the name of the first dimension group tagged "time_partitioning_field",
225        and fall back to "submission" if available.
226        """
227        has_submission = False
228        for _view_defn in self.get_view_lookml(view)["views"]:
229            if not _view_defn["name"] == view:
230                continue
231            for dim in _view_defn.get("dimension_groups", []):
232                if "time_partitioning_field" in dim.get("tags", []):
233                    return dim["name"]
234                elif dim["name"] == "submission":
235                    has_submission = True
236        if has_submission:
237            return "submission"
238        return None
239
240    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
241        """Get required filters for this view."""
242        filters = []
243        view = self.views[view_name]
244
245        # Add a default filter on channel, if it's present in the view
246        default_channel = self._get_default_channel(view)
247        if default_channel is not None:
248            filters.append({"channel": default_channel})
249
250        # Add submission filter, if present in the view
251        if time_partitioning_group := self.get_view_time_partitioning_group(view):
252            filters.append({f"{time_partitioning_group}_date": "28 days"})
253
254        return filters
255
256    def __eq__(self, other) -> bool:
257        """Check for equality with other View."""
258
259        def comparable_dict(d):
260            return tuple(sorted(d.items()))
261
262        if isinstance(other, Explore):
263            return (
264                self.name == other.name
265                and comparable_dict(self.views) == comparable_dict(other.views)
266                and self.type == other.type
267            )
268        return False
@dataclass
class Explore:
 15@dataclass
 16class Explore:
 17    """A generic explore."""
 18
 19    name: str
 20    views: Dict[str, str]
 21    views_path: Optional[Path] = None
 22    defn: Optional[Dict[str, str]] = None
 23    type: str = field(init=False)
 24
 25    def to_dict(self) -> dict:
 26        """Explore instance represented as a dict."""
 27        return {self.name: {"type": self.type, "views": self.views}}
 28
 29    def to_lookml(self, v1_name: Optional[str]) -> List[Dict[str, Any]]:
 30        """
 31        Generate LookML for this explore.
 32
 33        Any generation done in dependent explore's
 34        `_to_lookml` takes precedence over these fields.
 35        """
 36        base_lookml = {}
 37        base_view_name = next(
 38            (
 39                view_name
 40                for view_type, view_name in self.views.items()
 41                if view_type == "base_view"
 42            )
 43        )
 44        for view_type, view in self.views.items():
 45            # We look at our dependent views to see if they have a
 46            # "submission" field. Dependent views are any that are:
 47            # - base_view
 48            # - extended_view*
 49            #
 50            # We do not want to look at joined views. Those should be
 51            # labeled as:
 52            # - join*
 53            #
 54            # If they have a submission field, we filter on the date.
 55            # This allows for filter queries to succeed.
 56            if "join" in view_type:
 57                continue
 58            if time_partitioning_group := self.get_view_time_partitioning_group(view):
 59                base_lookml["sql_always_where"] = (
 60                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
 61                )
 62
 63        # We only update the first returned explore
 64        new_lookml = self._to_lookml(v1_name)
 65        base_lookml.update(new_lookml[0])
 66        new_lookml[0] = base_lookml
 67
 68        return new_lookml
 69
 70    def _to_lookml(
 71        self,
 72        v1_name: Optional[str],
 73    ) -> List[Dict[str, Any]]:
 74        raise NotImplementedError("Only implemented in subclasses")
 75
 76    def get_dependent_views(self) -> List[str]:
 77        """Get views this explore is dependent on."""
 78        dependent_views = []
 79        for _type, views in self.views.items():
 80            if _type.startswith("extended"):
 81                continue
 82            elif _type.startswith("joined"):
 83                dependent_views += [view for view in views]
 84            else:
 85                dependent_views.append(views)
 86        return dependent_views
 87
 88    @staticmethod
 89    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
 90        """Get an instance of an explore from a namespace definition."""
 91        raise NotImplementedError("Only implemented in subclasses")
 92
 93    def get_view_lookml(self, view: str) -> dict:
 94        """Get the LookML for a view."""
 95        if self.views_path is not None:
 96            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
 97        raise Exception("Missing view path for get_view_lookml")
 98
 99    def get_unnested_fields_joins_lookml(
100        self,
101    ) -> list:
102        """Get the LookML for joining unnested fields."""
103        views_lookml = self.get_view_lookml(self.views["base_view"])
104        views: List[str] = [view["name"] for view in views_lookml["views"]]
105        parent_base_name = views_lookml["views"][0]["name"]
106
107        extended_views: List[str] = []
108        if "extended_view" in self.views:
109            # check for extended views
110            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
111            extended_views = [view["name"] for view in extended_views_lookml["views"]]
112
113            views_lookml.update(extended_views_lookml)
114            views += extended_views
115
116        joins = []
117        for view in views_lookml["views"][1:]:
118            view_name = view["name"]
119            # get repeated, nested fields that exist as separate views in lookml
120            base_name, metric = self._get_base_name_and_metric(
121                view_name=view_name, views=views
122            )
123            metric_name = view_name
124            metric_label = slug_to_title(metric_name)
125
126            if view_name in extended_views:
127                # names of extended views are overriden by the name of the view that is extending them
128                metric_label = slug_to_title(
129                    metric_name.replace(base_name, parent_base_name)
130                )
131                base_name = parent_base_name
132
133            joins.append(
134                {
135                    "name": view_name,
136                    "view_label": metric_label,
137                    "relationship": "one_to_many",
138                    "sql": (
139                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
140                    ),
141                }
142            )
143
144        return joins
145
146    def _get_default_channel(self, view: str) -> Optional[str]:
147        channel_params = [
148            param
149            for _view_defn in self.get_view_lookml(view)["views"]
150            for param in _view_defn.get("filters", [])
151            if _view_defn["name"] == view and param["name"] == "channel"
152        ]
153
154        if channel_params:
155            allowed_values = channel_params[0]["suggestions"]
156            default_value = allowed_values[0]
157            return escape_filter_expr(default_value)
158        return None
159
160    def _get_base_name_and_metric(
161        self, view_name: str, views: List[str]
162    ) -> Tuple[str, str]:
163        """
164        Get base view and metric names.
165
166        Returns the the name of the base view and the metric based on the
167        passed `view_name` and existing views.
168
169        The names are resolved in a backwards fashion to account for
170        repeated nested fields that might contain other nested fields.
171        For example:
172
173        view: sync {
174            [...]
175            dimension: payload__events {
176                sql: ${TABLE}.payload.events ;;
177            }
178        }
179
180        view: sync__payload__events {
181            [...]
182            dimension: f5_ {
183                sql: ${TABLE}.f5_ ;;
184            }
185        }
186
187        view: sync__payload__events__f5_ {
188            [...]
189        }
190
191        For these nested views to get translated to the following joins, the names
192        need to be resolved backwards:
193
194        join: sync__payload__events {
195            relationship: one_to_many
196            sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;;
197        }
198
199        join: sync__payload__events__f5_ {
200            relationship: one_to_many
201            sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;;
202        }
203        """
204        split = view_name.split("__")
205        for index in range(len(split) - 1, 0, -1):
206            base_view = "__".join(split[:index])
207            metric = "__".join(split[index:])
208            if base_view in views:
209                return (base_view, metric)
210        raise Exception(f"Cannot get base name and metric from view {view_name}")
211
212    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
213        """Determine whether a this view has this dimension."""
214        for _view_defn in self.get_view_lookml(view)["views"]:
215            if _view_defn["name"] != view:
216                continue
217            for dim in _view_defn.get("dimensions", []):
218                if dim["name"] == dimension_name:
219                    return True
220        return False
221
222    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
223        """Get time partitiong dimension group for this view.
224
225        Return the name of the first dimension group tagged "time_partitioning_field",
226        and fall back to "submission" if available.
227        """
228        has_submission = False
229        for _view_defn in self.get_view_lookml(view)["views"]:
230            if not _view_defn["name"] == view:
231                continue
232            for dim in _view_defn.get("dimension_groups", []):
233                if "time_partitioning_field" in dim.get("tags", []):
234                    return dim["name"]
235                elif dim["name"] == "submission":
236                    has_submission = True
237        if has_submission:
238            return "submission"
239        return None
240
241    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
242        """Get required filters for this view."""
243        filters = []
244        view = self.views[view_name]
245
246        # Add a default filter on channel, if it's present in the view
247        default_channel = self._get_default_channel(view)
248        if default_channel is not None:
249            filters.append({"channel": default_channel})
250
251        # Add submission filter, if present in the view
252        if time_partitioning_group := self.get_view_time_partitioning_group(view):
253            filters.append({f"{time_partitioning_group}_date": "28 days"})
254
255        return filters
256
257    def __eq__(self, other) -> bool:
258        """Check for equality with other View."""
259
260        def comparable_dict(d):
261            return tuple(sorted(d.items()))
262
263        if isinstance(other, Explore):
264            return (
265                self.name == other.name
266                and comparable_dict(self.views) == comparable_dict(other.views)
267                and self.type == other.type
268            )
269        return False

A generic explore.

Explore( name: str, views: Dict[str, str], views_path: Optional[pathlib.Path] = None, defn: Optional[Dict[str, str]] = None)
name: str
views: Dict[str, str]
views_path: Optional[pathlib.Path] = None
defn: Optional[Dict[str, str]] = None
type: str
def to_dict(self) -> dict:
25    def to_dict(self) -> dict:
26        """Explore instance represented as a dict."""
27        return {self.name: {"type": self.type, "views": self.views}}

Explore instance represented as a dict.

def to_lookml(self, v1_name: Optional[str]) -> List[Dict[str, Any]]:
29    def to_lookml(self, v1_name: Optional[str]) -> List[Dict[str, Any]]:
30        """
31        Generate LookML for this explore.
32
33        Any generation done in dependent explore's
34        `_to_lookml` takes precedence over these fields.
35        """
36        base_lookml = {}
37        base_view_name = next(
38            (
39                view_name
40                for view_type, view_name in self.views.items()
41                if view_type == "base_view"
42            )
43        )
44        for view_type, view in self.views.items():
45            # We look at our dependent views to see if they have a
46            # "submission" field. Dependent views are any that are:
47            # - base_view
48            # - extended_view*
49            #
50            # We do not want to look at joined views. Those should be
51            # labeled as:
52            # - join*
53            #
54            # If they have a submission field, we filter on the date.
55            # This allows for filter queries to succeed.
56            if "join" in view_type:
57                continue
58            if time_partitioning_group := self.get_view_time_partitioning_group(view):
59                base_lookml["sql_always_where"] = (
60                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
61                )
62
63        # We only update the first returned explore
64        new_lookml = self._to_lookml(v1_name)
65        base_lookml.update(new_lookml[0])
66        new_lookml[0] = base_lookml
67
68        return new_lookml

Generate LookML for this explore.

Any generation done in dependent explore's _to_lookml takes precedence over these fields.

def get_dependent_views(self) -> List[str]:
76    def get_dependent_views(self) -> List[str]:
77        """Get views this explore is dependent on."""
78        dependent_views = []
79        for _type, views in self.views.items():
80            if _type.startswith("extended"):
81                continue
82            elif _type.startswith("joined"):
83                dependent_views += [view for view in views]
84            else:
85                dependent_views.append(views)
86        return dependent_views

Get views this explore is dependent on.

@staticmethod
def from_dict( name: str, defn: dict, views_path: pathlib.Path) -> Explore:
88    @staticmethod
89    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
90        """Get an instance of an explore from a namespace definition."""
91        raise NotImplementedError("Only implemented in subclasses")

Get an instance of an explore from a namespace definition.

def get_view_lookml(self, view: str) -> dict:
93    def get_view_lookml(self, view: str) -> dict:
94        """Get the LookML for a view."""
95        if self.views_path is not None:
96            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
97        raise Exception("Missing view path for get_view_lookml")

Get the LookML for a view.

def get_unnested_fields_joins_lookml(self) -> list:
 99    def get_unnested_fields_joins_lookml(
100        self,
101    ) -> list:
102        """Get the LookML for joining unnested fields."""
103        views_lookml = self.get_view_lookml(self.views["base_view"])
104        views: List[str] = [view["name"] for view in views_lookml["views"]]
105        parent_base_name = views_lookml["views"][0]["name"]
106
107        extended_views: List[str] = []
108        if "extended_view" in self.views:
109            # check for extended views
110            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
111            extended_views = [view["name"] for view in extended_views_lookml["views"]]
112
113            views_lookml.update(extended_views_lookml)
114            views += extended_views
115
116        joins = []
117        for view in views_lookml["views"][1:]:
118            view_name = view["name"]
119            # get repeated, nested fields that exist as separate views in lookml
120            base_name, metric = self._get_base_name_and_metric(
121                view_name=view_name, views=views
122            )
123            metric_name = view_name
124            metric_label = slug_to_title(metric_name)
125
126            if view_name in extended_views:
127                # names of extended views are overriden by the name of the view that is extending them
128                metric_label = slug_to_title(
129                    metric_name.replace(base_name, parent_base_name)
130                )
131                base_name = parent_base_name
132
133            joins.append(
134                {
135                    "name": view_name,
136                    "view_label": metric_label,
137                    "relationship": "one_to_many",
138                    "sql": (
139                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
140                    ),
141                }
142            )
143
144        return joins

Get the LookML for joining unnested fields.

def has_view_dimension(self, view: str, dimension_name: str) -> bool:
212    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
213        """Determine whether a this view has this dimension."""
214        for _view_defn in self.get_view_lookml(view)["views"]:
215            if _view_defn["name"] != view:
216                continue
217            for dim in _view_defn.get("dimensions", []):
218                if dim["name"] == dimension_name:
219                    return True
220        return False

Determine whether a this view has this dimension.

def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
222    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
223        """Get time partitiong dimension group for this view.
224
225        Return the name of the first dimension group tagged "time_partitioning_field",
226        and fall back to "submission" if available.
227        """
228        has_submission = False
229        for _view_defn in self.get_view_lookml(view)["views"]:
230            if not _view_defn["name"] == view:
231                continue
232            for dim in _view_defn.get("dimension_groups", []):
233                if "time_partitioning_field" in dim.get("tags", []):
234                    return dim["name"]
235                elif dim["name"] == "submission":
236                    has_submission = True
237        if has_submission:
238            return "submission"
239        return None

Get time partitiong dimension group for this view.

Return the name of the first dimension group tagged "time_partitioning_field", and fall back to "submission" if available.

def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
241    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
242        """Get required filters for this view."""
243        filters = []
244        view = self.views[view_name]
245
246        # Add a default filter on channel, if it's present in the view
247        default_channel = self._get_default_channel(view)
248        if default_channel is not None:
249            filters.append({"channel": default_channel})
250
251        # Add submission filter, if present in the view
252        if time_partitioning_group := self.get_view_time_partitioning_group(view):
253            filters.append({f"{time_partitioning_group}_date": "28 days"})
254
255        return filters

Get required filters for this view.