generator.explores.explore

Generic explore type.

  1"""Generic explore type."""
  2
  3from __future__ import annotations
  4
  5from dataclasses import dataclass, field
  6from pathlib import Path
  7from typing import Any, Dict, List, Optional, Tuple
  8
  9import lkml
 10
 11from ..views.lookml_utils import escape_filter_expr, slug_to_title
 12
 13
 14@dataclass
 15class Explore:
 16    """A generic explore."""
 17
 18    name: str
 19    views: Dict[str, str]
 20    views_path: Optional[Path] = None
 21    defn: Optional[Dict[str, str]] = None
 22    type: str = field(init=False)
 23
 24    def to_dict(self) -> dict:
 25        """Explore instance represented as a dict."""
 26        return {self.name: {"type": self.type, "views": self.views}}
 27
 28    def to_lookml(
 29        self, v1_name: Optional[str], hidden: Optional[bool]
 30    ) -> List[Dict[str, Any]]:
 31        """
 32        Generate LookML for this explore.
 33
 34        Any generation done in dependent explore's
 35        `_to_lookml` takes precedence over these fields.
 36        """
 37        base_lookml = {}
 38        if hidden:
 39            base_lookml["hidden"] = "yes"
 40        base_view_name = next(
 41            (
 42                view_name
 43                for view_type, view_name in self.views.items()
 44                if view_type == "base_view"
 45            )
 46        )
 47        for view_type, view in self.views.items():
 48            # We look at our dependent views to see if they have a
 49            # "submission" field. Dependent views are any that are:
 50            # - base_view
 51            # - extended_view*
 52            #
 53            # We do not want to look at joined views. Those should be
 54            # labeled as:
 55            # - join*
 56            #
 57            # If they have a submission field, we filter on the date.
 58            # This allows for filter queries to succeed.
 59            if "join" in view_type:
 60                continue
 61            if time_partitioning_group := self.get_view_time_partitioning_group(view):
 62                base_lookml["sql_always_where"] = (
 63                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
 64                )
 65
 66        # We only update the first returned explore
 67        new_lookml = self._to_lookml(v1_name)
 68        base_lookml.update(new_lookml[0])
 69        new_lookml[0] = base_lookml
 70
 71        return new_lookml
 72
 73    def _to_lookml(
 74        self,
 75        v1_name: Optional[str],
 76    ) -> List[Dict[str, Any]]:
 77        raise NotImplementedError("Only implemented in subclasses")
 78
 79    def get_dependent_views(self) -> List[str]:
 80        """Get views this explore is dependent on."""
 81        dependent_views = []
 82        for _type, views in self.views.items():
 83            if _type.startswith("extended"):
 84                continue
 85            elif _type.startswith("joined"):
 86                dependent_views += [view for view in views]
 87            else:
 88                dependent_views.append(views)
 89        return dependent_views
 90
 91    @staticmethod
 92    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
 93        """Get an instance of an explore from a namespace definition."""
 94        raise NotImplementedError("Only implemented in subclasses")
 95
 96    def get_view_lookml(self, view: str) -> dict:
 97        """Get the LookML for a view."""
 98        if self.views_path is not None:
 99            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
100
101        raise Exception("Missing view path for get_view_lookml")
102
103    def get_datagroup(self) -> Optional[str]:
104        """
105        Return the name of the associated datagroup.
106
107        Return `None` if there is no datagroup for this explore.
108        """
109        if self.views_path and (self.views_path.parent / "datagroups").exists():
110            datagroups_path = self.views_path.parent / "datagroups"
111            datagroup_file = (
112                datagroups_path
113                / f'{self.views["base_view"]}_last_updated.datagroup.lkml'
114            )
115            if datagroup_file.exists():
116                return f'{self.views["base_view"]}_last_updated'
117        return None
118
119    def get_unnested_fields_joins_lookml(
120        self,
121    ) -> list:
122        """Get the LookML for joining unnested fields."""
123        views_lookml = self.get_view_lookml(self.views["base_view"])
124        views: List[str] = [view["name"] for view in views_lookml["views"]]
125        parent_base_name = views_lookml["views"][0]["name"]
126
127        extended_views: List[str] = []
128        if "extended_view" in self.views:
129            # check for extended views
130            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
131            extended_views = [view["name"] for view in extended_views_lookml["views"]]
132
133            views_lookml.update(extended_views_lookml)
134            views += extended_views
135
136        joins = []
137        for view in views_lookml["views"][1:]:
138            view_name = view["name"]
139            # get repeated, nested fields that exist as separate views in lookml
140            base_name, metric = self._get_base_name_and_metric(
141                view_name=view_name, views=views
142            )
143            metric_name = view_name
144            metric_label = slug_to_title(metric_name)
145
146            if view_name in extended_views:
147                # names of extended views are overriden by the name of the view that is extending them
148                metric_label = slug_to_title(
149                    metric_name.replace(base_name, parent_base_name)
150                )
151                base_name = parent_base_name
152
153            joins.append(
154                {
155                    "name": view_name,
156                    "view_label": metric_label,
157                    "relationship": "one_to_many",
158                    "sql": (
159                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
160                    ),
161                }
162            )
163
164        return joins
165
166    def _get_default_channel(self, view: str) -> Optional[str]:
167        channel_params = [
168            param
169            for _view_defn in self.get_view_lookml(view)["views"]
170            for param in _view_defn.get("filters", [])
171            if _view_defn["name"] == view and param["name"] == "channel"
172        ]
173
174        if channel_params:
175            allowed_values = channel_params[0]["suggestions"]
176            default_value = allowed_values[0]
177            return escape_filter_expr(default_value)
178        return None
179
180    def _get_base_name_and_metric(
181        self, view_name: str, views: List[str]
182    ) -> Tuple[str, str]:
183        """
184        Get base view and metric names.
185
186        Returns the the name of the base view and the metric based on the
187        passed `view_name` and existing views.
188
189        The names are resolved in a backwards fashion to account for
190        repeated nested fields that might contain other nested fields.
191        For example:
192
193        view: sync {
194            [...]
195            dimension: payload__events {
196                sql: ${TABLE}.payload.events ;;
197            }
198        }
199
200        view: sync__payload__events {
201            [...]
202            dimension: f5_ {
203                sql: ${TABLE}.f5_ ;;
204            }
205        }
206
207        view: sync__payload__events__f5_ {
208            [...]
209        }
210
211        For these nested views to get translated to the following joins, the names
212        need to be resolved backwards:
213
214        join: sync__payload__events {
215            relationship: one_to_many
216            sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;;
217        }
218
219        join: sync__payload__events__f5_ {
220            relationship: one_to_many
221            sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;;
222        }
223        """
224        split = view_name.split("__")
225        for index in range(len(split) - 1, 0, -1):
226            base_view = "__".join(split[:index])
227            metric = "__".join(split[index:])
228            if base_view in views:
229                return (base_view, metric)
230        raise Exception(f"Cannot get base name and metric from view {view_name}")
231
232    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
233        """Determine whether a this view has this dimension."""
234        for _view_defn in self.get_view_lookml(view)["views"]:
235            if _view_defn["name"] != view:
236                continue
237            for dim in _view_defn.get("dimensions", []):
238                if dim["name"] == dimension_name:
239                    return True
240        return False
241
242    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
243        """Get time partitiong dimension group for this view.
244
245        Return the name of the first dimension group tagged "time_partitioning_field",
246        and fall back to "submission" if available.
247        """
248        has_submission = False
249        for _view_defn in self.get_view_lookml(view)["views"]:
250            if not _view_defn["name"] == view:
251                continue
252            for dim in _view_defn.get("dimension_groups", []):
253                if "time_partitioning_field" in dim.get("tags", []):
254                    return dim["name"]
255                elif dim["name"] == "submission":
256                    has_submission = True
257        if has_submission:
258            return "submission"
259        return None
260
261    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
262        """Get required filters for this view."""
263        filters = []
264        view = self.views[view_name]
265
266        # Add a default filter on channel, if it's present in the view
267        default_channel = self._get_default_channel(view)
268        if default_channel is not None:
269            filters.append({"channel": default_channel})
270
271        # Add submission filter, if present in the view
272        if time_partitioning_group := self.get_view_time_partitioning_group(view):
273            filters.append({f"{time_partitioning_group}_date": "28 days"})
274
275        return filters
276
277    def __eq__(self, other) -> bool:
278        """Check for equality with other View."""
279
280        def comparable_dict(d):
281            return tuple(sorted(d.items()))
282
283        if isinstance(other, Explore):
284            return (
285                self.name == other.name
286                and comparable_dict(self.views) == comparable_dict(other.views)
287                and self.type == other.type
288            )
289        return False
@dataclass
class Explore:
 15@dataclass
 16class Explore:
 17    """A generic explore."""
 18
 19    name: str
 20    views: Dict[str, str]
 21    views_path: Optional[Path] = None
 22    defn: Optional[Dict[str, str]] = None
 23    type: str = field(init=False)
 24
 25    def to_dict(self) -> dict:
 26        """Explore instance represented as a dict."""
 27        return {self.name: {"type": self.type, "views": self.views}}
 28
 29    def to_lookml(
 30        self, v1_name: Optional[str], hidden: Optional[bool]
 31    ) -> List[Dict[str, Any]]:
 32        """
 33        Generate LookML for this explore.
 34
 35        Any generation done in dependent explore's
 36        `_to_lookml` takes precedence over these fields.
 37        """
 38        base_lookml = {}
 39        if hidden:
 40            base_lookml["hidden"] = "yes"
 41        base_view_name = next(
 42            (
 43                view_name
 44                for view_type, view_name in self.views.items()
 45                if view_type == "base_view"
 46            )
 47        )
 48        for view_type, view in self.views.items():
 49            # We look at our dependent views to see if they have a
 50            # "submission" field. Dependent views are any that are:
 51            # - base_view
 52            # - extended_view*
 53            #
 54            # We do not want to look at joined views. Those should be
 55            # labeled as:
 56            # - join*
 57            #
 58            # If they have a submission field, we filter on the date.
 59            # This allows for filter queries to succeed.
 60            if "join" in view_type:
 61                continue
 62            if time_partitioning_group := self.get_view_time_partitioning_group(view):
 63                base_lookml["sql_always_where"] = (
 64                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
 65                )
 66
 67        # We only update the first returned explore
 68        new_lookml = self._to_lookml(v1_name)
 69        base_lookml.update(new_lookml[0])
 70        new_lookml[0] = base_lookml
 71
 72        return new_lookml
 73
 74    def _to_lookml(
 75        self,
 76        v1_name: Optional[str],
 77    ) -> List[Dict[str, Any]]:
 78        raise NotImplementedError("Only implemented in subclasses")
 79
 80    def get_dependent_views(self) -> List[str]:
 81        """Get views this explore is dependent on."""
 82        dependent_views = []
 83        for _type, views in self.views.items():
 84            if _type.startswith("extended"):
 85                continue
 86            elif _type.startswith("joined"):
 87                dependent_views += [view for view in views]
 88            else:
 89                dependent_views.append(views)
 90        return dependent_views
 91
 92    @staticmethod
 93    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
 94        """Get an instance of an explore from a namespace definition."""
 95        raise NotImplementedError("Only implemented in subclasses")
 96
 97    def get_view_lookml(self, view: str) -> dict:
 98        """Get the LookML for a view."""
 99        if self.views_path is not None:
100            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
101
102        raise Exception("Missing view path for get_view_lookml")
103
104    def get_datagroup(self) -> Optional[str]:
105        """
106        Return the name of the associated datagroup.
107
108        Return `None` if there is no datagroup for this explore.
109        """
110        if self.views_path and (self.views_path.parent / "datagroups").exists():
111            datagroups_path = self.views_path.parent / "datagroups"
112            datagroup_file = (
113                datagroups_path
114                / f'{self.views["base_view"]}_last_updated.datagroup.lkml'
115            )
116            if datagroup_file.exists():
117                return f'{self.views["base_view"]}_last_updated'
118        return None
119
120    def get_unnested_fields_joins_lookml(
121        self,
122    ) -> list:
123        """Get the LookML for joining unnested fields."""
124        views_lookml = self.get_view_lookml(self.views["base_view"])
125        views: List[str] = [view["name"] for view in views_lookml["views"]]
126        parent_base_name = views_lookml["views"][0]["name"]
127
128        extended_views: List[str] = []
129        if "extended_view" in self.views:
130            # check for extended views
131            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
132            extended_views = [view["name"] for view in extended_views_lookml["views"]]
133
134            views_lookml.update(extended_views_lookml)
135            views += extended_views
136
137        joins = []
138        for view in views_lookml["views"][1:]:
139            view_name = view["name"]
140            # get repeated, nested fields that exist as separate views in lookml
141            base_name, metric = self._get_base_name_and_metric(
142                view_name=view_name, views=views
143            )
144            metric_name = view_name
145            metric_label = slug_to_title(metric_name)
146
147            if view_name in extended_views:
148                # names of extended views are overriden by the name of the view that is extending them
149                metric_label = slug_to_title(
150                    metric_name.replace(base_name, parent_base_name)
151                )
152                base_name = parent_base_name
153
154            joins.append(
155                {
156                    "name": view_name,
157                    "view_label": metric_label,
158                    "relationship": "one_to_many",
159                    "sql": (
160                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
161                    ),
162                }
163            )
164
165        return joins
166
167    def _get_default_channel(self, view: str) -> Optional[str]:
168        channel_params = [
169            param
170            for _view_defn in self.get_view_lookml(view)["views"]
171            for param in _view_defn.get("filters", [])
172            if _view_defn["name"] == view and param["name"] == "channel"
173        ]
174
175        if channel_params:
176            allowed_values = channel_params[0]["suggestions"]
177            default_value = allowed_values[0]
178            return escape_filter_expr(default_value)
179        return None
180
181    def _get_base_name_and_metric(
182        self, view_name: str, views: List[str]
183    ) -> Tuple[str, str]:
184        """
185        Get base view and metric names.
186
187        Returns the the name of the base view and the metric based on the
188        passed `view_name` and existing views.
189
190        The names are resolved in a backwards fashion to account for
191        repeated nested fields that might contain other nested fields.
192        For example:
193
194        view: sync {
195            [...]
196            dimension: payload__events {
197                sql: ${TABLE}.payload.events ;;
198            }
199        }
200
201        view: sync__payload__events {
202            [...]
203            dimension: f5_ {
204                sql: ${TABLE}.f5_ ;;
205            }
206        }
207
208        view: sync__payload__events__f5_ {
209            [...]
210        }
211
212        For these nested views to get translated to the following joins, the names
213        need to be resolved backwards:
214
215        join: sync__payload__events {
216            relationship: one_to_many
217            sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;;
218        }
219
220        join: sync__payload__events__f5_ {
221            relationship: one_to_many
222            sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;;
223        }
224        """
225        split = view_name.split("__")
226        for index in range(len(split) - 1, 0, -1):
227            base_view = "__".join(split[:index])
228            metric = "__".join(split[index:])
229            if base_view in views:
230                return (base_view, metric)
231        raise Exception(f"Cannot get base name and metric from view {view_name}")
232
233    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
234        """Determine whether a this view has this dimension."""
235        for _view_defn in self.get_view_lookml(view)["views"]:
236            if _view_defn["name"] != view:
237                continue
238            for dim in _view_defn.get("dimensions", []):
239                if dim["name"] == dimension_name:
240                    return True
241        return False
242
243    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
244        """Get time partitiong dimension group for this view.
245
246        Return the name of the first dimension group tagged "time_partitioning_field",
247        and fall back to "submission" if available.
248        """
249        has_submission = False
250        for _view_defn in self.get_view_lookml(view)["views"]:
251            if not _view_defn["name"] == view:
252                continue
253            for dim in _view_defn.get("dimension_groups", []):
254                if "time_partitioning_field" in dim.get("tags", []):
255                    return dim["name"]
256                elif dim["name"] == "submission":
257                    has_submission = True
258        if has_submission:
259            return "submission"
260        return None
261
262    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
263        """Get required filters for this view."""
264        filters = []
265        view = self.views[view_name]
266
267        # Add a default filter on channel, if it's present in the view
268        default_channel = self._get_default_channel(view)
269        if default_channel is not None:
270            filters.append({"channel": default_channel})
271
272        # Add submission filter, if present in the view
273        if time_partitioning_group := self.get_view_time_partitioning_group(view):
274            filters.append({f"{time_partitioning_group}_date": "28 days"})
275
276        return filters
277
278    def __eq__(self, other) -> bool:
279        """Check for equality with other View."""
280
281        def comparable_dict(d):
282            return tuple(sorted(d.items()))
283
284        if isinstance(other, Explore):
285            return (
286                self.name == other.name
287                and comparable_dict(self.views) == comparable_dict(other.views)
288                and self.type == other.type
289            )
290        return False

A generic explore.

Explore( name: str, views: Dict[str, str], views_path: Optional[pathlib.Path] = None, defn: Optional[Dict[str, str]] = None)
name: str
views: Dict[str, str]
views_path: Optional[pathlib.Path] = None
defn: Optional[Dict[str, str]] = None
type: str
def to_dict(self) -> dict:
25    def to_dict(self) -> dict:
26        """Explore instance represented as a dict."""
27        return {self.name: {"type": self.type, "views": self.views}}

Explore instance represented as a dict.

def to_lookml( self, v1_name: Optional[str], hidden: Optional[bool]) -> List[Dict[str, Any]]:
29    def to_lookml(
30        self, v1_name: Optional[str], hidden: Optional[bool]
31    ) -> List[Dict[str, Any]]:
32        """
33        Generate LookML for this explore.
34
35        Any generation done in dependent explore's
36        `_to_lookml` takes precedence over these fields.
37        """
38        base_lookml = {}
39        if hidden:
40            base_lookml["hidden"] = "yes"
41        base_view_name = next(
42            (
43                view_name
44                for view_type, view_name in self.views.items()
45                if view_type == "base_view"
46            )
47        )
48        for view_type, view in self.views.items():
49            # We look at our dependent views to see if they have a
50            # "submission" field. Dependent views are any that are:
51            # - base_view
52            # - extended_view*
53            #
54            # We do not want to look at joined views. Those should be
55            # labeled as:
56            # - join*
57            #
58            # If they have a submission field, we filter on the date.
59            # This allows for filter queries to succeed.
60            if "join" in view_type:
61                continue
62            if time_partitioning_group := self.get_view_time_partitioning_group(view):
63                base_lookml["sql_always_where"] = (
64                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
65                )
66
67        # We only update the first returned explore
68        new_lookml = self._to_lookml(v1_name)
69        base_lookml.update(new_lookml[0])
70        new_lookml[0] = base_lookml
71
72        return new_lookml

Generate LookML for this explore.

Any generation done in dependent explore's _to_lookml takes precedence over these fields.

def get_dependent_views(self) -> List[str]:
80    def get_dependent_views(self) -> List[str]:
81        """Get views this explore is dependent on."""
82        dependent_views = []
83        for _type, views in self.views.items():
84            if _type.startswith("extended"):
85                continue
86            elif _type.startswith("joined"):
87                dependent_views += [view for view in views]
88            else:
89                dependent_views.append(views)
90        return dependent_views

Get views this explore is dependent on.

@staticmethod
def from_dict( name: str, defn: dict, views_path: pathlib.Path) -> Explore:
92    @staticmethod
93    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
94        """Get an instance of an explore from a namespace definition."""
95        raise NotImplementedError("Only implemented in subclasses")

Get an instance of an explore from a namespace definition.

def get_view_lookml(self, view: str) -> dict:
 97    def get_view_lookml(self, view: str) -> dict:
 98        """Get the LookML for a view."""
 99        if self.views_path is not None:
100            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
101
102        raise Exception("Missing view path for get_view_lookml")

Get the LookML for a view.

def get_datagroup(self) -> Optional[str]:
104    def get_datagroup(self) -> Optional[str]:
105        """
106        Return the name of the associated datagroup.
107
108        Return `None` if there is no datagroup for this explore.
109        """
110        if self.views_path and (self.views_path.parent / "datagroups").exists():
111            datagroups_path = self.views_path.parent / "datagroups"
112            datagroup_file = (
113                datagroups_path
114                / f'{self.views["base_view"]}_last_updated.datagroup.lkml'
115            )
116            if datagroup_file.exists():
117                return f'{self.views["base_view"]}_last_updated'
118        return None

Return the name of the associated datagroup.

Return None if there is no datagroup for this explore.

def get_unnested_fields_joins_lookml(self) -> list:
120    def get_unnested_fields_joins_lookml(
121        self,
122    ) -> list:
123        """Get the LookML for joining unnested fields."""
124        views_lookml = self.get_view_lookml(self.views["base_view"])
125        views: List[str] = [view["name"] for view in views_lookml["views"]]
126        parent_base_name = views_lookml["views"][0]["name"]
127
128        extended_views: List[str] = []
129        if "extended_view" in self.views:
130            # check for extended views
131            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
132            extended_views = [view["name"] for view in extended_views_lookml["views"]]
133
134            views_lookml.update(extended_views_lookml)
135            views += extended_views
136
137        joins = []
138        for view in views_lookml["views"][1:]:
139            view_name = view["name"]
140            # get repeated, nested fields that exist as separate views in lookml
141            base_name, metric = self._get_base_name_and_metric(
142                view_name=view_name, views=views
143            )
144            metric_name = view_name
145            metric_label = slug_to_title(metric_name)
146
147            if view_name in extended_views:
148                # names of extended views are overriden by the name of the view that is extending them
149                metric_label = slug_to_title(
150                    metric_name.replace(base_name, parent_base_name)
151                )
152                base_name = parent_base_name
153
154            joins.append(
155                {
156                    "name": view_name,
157                    "view_label": metric_label,
158                    "relationship": "one_to_many",
159                    "sql": (
160                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
161                    ),
162                }
163            )
164
165        return joins

Get the LookML for joining unnested fields.

def has_view_dimension(self, view: str, dimension_name: str) -> bool:
233    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
234        """Determine whether a this view has this dimension."""
235        for _view_defn in self.get_view_lookml(view)["views"]:
236            if _view_defn["name"] != view:
237                continue
238            for dim in _view_defn.get("dimensions", []):
239                if dim["name"] == dimension_name:
240                    return True
241        return False

Determine whether a this view has this dimension.

def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
243    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
244        """Get time partitiong dimension group for this view.
245
246        Return the name of the first dimension group tagged "time_partitioning_field",
247        and fall back to "submission" if available.
248        """
249        has_submission = False
250        for _view_defn in self.get_view_lookml(view)["views"]:
251            if not _view_defn["name"] == view:
252                continue
253            for dim in _view_defn.get("dimension_groups", []):
254                if "time_partitioning_field" in dim.get("tags", []):
255                    return dim["name"]
256                elif dim["name"] == "submission":
257                    has_submission = True
258        if has_submission:
259            return "submission"
260        return None

Get time partitiong dimension group for this view.

Return the name of the first dimension group tagged "time_partitioning_field", and fall back to "submission" if available.

def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
262    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
263        """Get required filters for this view."""
264        filters = []
265        view = self.views[view_name]
266
267        # Add a default filter on channel, if it's present in the view
268        default_channel = self._get_default_channel(view)
269        if default_channel is not None:
270            filters.append({"channel": default_channel})
271
272        # Add submission filter, if present in the view
273        if time_partitioning_group := self.get_view_time_partitioning_group(view):
274            filters.append({f"{time_partitioning_group}_date": "28 days"})
275
276        return filters

Get required filters for this view.