generator.explores.explore

Generic explore type.

  1"""Generic explore type."""
  2
  3from __future__ import annotations
  4
  5from dataclasses import dataclass, field
  6from pathlib import Path
  7from typing import Any, Dict, List, Optional, Tuple
  8
  9import lkml
 10
 11from ..views.lookml_utils import escape_filter_expr, slug_to_title
 12
 13
 14@dataclass
 15class Explore:
 16    """A generic explore."""
 17
 18    name: str
 19    views: Dict[str, str]
 20    views_path: Optional[Path] = None
 21    defn: Optional[Dict[str, str]] = None
 22    type: str = field(init=False)
 23
 24    def to_dict(self) -> dict:
 25        """Explore instance represented as a dict."""
 26        return {self.name: {"type": self.type, "views": self.views}}
 27
 28    def to_lookml(
 29        self, v1_name: Optional[str], hidden: Optional[bool]
 30    ) -> List[Dict[str, Any]]:
 31        """
 32        Generate LookML for this explore.
 33
 34        Any generation done in dependent explore's
 35        `_to_lookml` takes precedence over these fields.
 36        """
 37        base_lookml = {}
 38        if hidden:
 39            base_lookml["hidden"] = "yes"
 40        base_view_name = next(
 41            (
 42                view_name
 43                for view_type, view_name in self.views.items()
 44                if view_type == "base_view"
 45            )
 46        )
 47        for view_type, view in self.views.items():
 48            # We look at our dependent views to see if they have a
 49            # "submission" field. Dependent views are any that are:
 50            # - base_view
 51            # - extended_view*
 52            #
 53            # We do not want to look at joined views. Those should be
 54            # labeled as:
 55            # - join*
 56            #
 57            # If they have a submission field, we filter on the date.
 58            # This allows for filter queries to succeed.
 59            if "join" in view_type:
 60                continue
 61            if time_partitioning_group := self.get_view_time_partitioning_group(view):
 62                base_lookml["sql_always_where"] = (
 63                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
 64                )
 65
 66        # We only update the first returned explore
 67        new_lookml = self._to_lookml(v1_name)
 68        base_lookml.update(new_lookml[0])
 69        new_lookml[0] = base_lookml
 70
 71        return new_lookml
 72
 73    def _to_lookml(
 74        self,
 75        v1_name: Optional[str],
 76    ) -> List[Dict[str, Any]]:
 77        raise NotImplementedError("Only implemented in subclasses")
 78
 79    def get_dependent_views(self) -> List[str]:
 80        """Get views this explore is dependent on."""
 81        dependent_views = []
 82        for _type, views in self.views.items():
 83            if _type.startswith("extended"):
 84                continue
 85            elif _type.startswith("joined"):
 86                dependent_views += [view for view in views]
 87            else:
 88                dependent_views.append(views)
 89        return dependent_views
 90
 91    @staticmethod
 92    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
 93        """Get an instance of an explore from a namespace definition."""
 94        raise NotImplementedError("Only implemented in subclasses")
 95
 96    def get_view_lookml(self, view: str) -> dict:
 97        """Get the LookML for a view."""
 98        if self.views_path is not None:
 99            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
100        raise Exception("Missing view path for get_view_lookml")
101
102    def get_datagroup(self) -> Optional[str]:
103        """
104        Return the name of the associated datagroup.
105
106        Return `None` if there is no datagroup for this explore.
107        """
108        if self.views_path and (self.views_path.parent / "datagroups").exists():
109            datagroups_path = self.views_path.parent / "datagroups"
110            datagroup_file = (
111                datagroups_path
112                / f'{self.views["base_view"]}_last_updated.datagroup.lkml'
113            )
114            if datagroup_file.exists():
115                return f'{self.views["base_view"]}_last_updated'
116        return None
117
118    def get_unnested_fields_joins_lookml(
119        self,
120    ) -> list:
121        """Get the LookML for joining unnested fields."""
122        views_lookml = self.get_view_lookml(self.views["base_view"])
123        views: List[str] = [view["name"] for view in views_lookml["views"]]
124        parent_base_name = views_lookml["views"][0]["name"]
125
126        extended_views: List[str] = []
127        if "extended_view" in self.views:
128            # check for extended views
129            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
130            extended_views = [view["name"] for view in extended_views_lookml["views"]]
131
132            views_lookml.update(extended_views_lookml)
133            views += extended_views
134
135        joins = []
136        for view in views_lookml["views"][1:]:
137            view_name = view["name"]
138            # get repeated, nested fields that exist as separate views in lookml
139            base_name, metric = self._get_base_name_and_metric(
140                view_name=view_name, views=views
141            )
142            metric_name = view_name
143            metric_label = slug_to_title(metric_name)
144
145            if view_name in extended_views:
146                # names of extended views are overriden by the name of the view that is extending them
147                metric_label = slug_to_title(
148                    metric_name.replace(base_name, parent_base_name)
149                )
150                base_name = parent_base_name
151
152            joins.append(
153                {
154                    "name": view_name,
155                    "view_label": metric_label,
156                    "relationship": "one_to_many",
157                    "sql": (
158                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
159                    ),
160                }
161            )
162
163        return joins
164
165    def _get_default_channel(self, view: str) -> Optional[str]:
166        channel_params = [
167            param
168            for _view_defn in self.get_view_lookml(view)["views"]
169            for param in _view_defn.get("filters", [])
170            if _view_defn["name"] == view and param["name"] == "channel"
171        ]
172
173        if channel_params:
174            allowed_values = channel_params[0]["suggestions"]
175            default_value = allowed_values[0]
176            return escape_filter_expr(default_value)
177        return None
178
179    def _get_base_name_and_metric(
180        self, view_name: str, views: List[str]
181    ) -> Tuple[str, str]:
182        """
183        Get base view and metric names.
184
185        Returns the the name of the base view and the metric based on the
186        passed `view_name` and existing views.
187
188        The names are resolved in a backwards fashion to account for
189        repeated nested fields that might contain other nested fields.
190        For example:
191
192        view: sync {
193            [...]
194            dimension: payload__events {
195                sql: ${TABLE}.payload.events ;;
196            }
197        }
198
199        view: sync__payload__events {
200            [...]
201            dimension: f5_ {
202                sql: ${TABLE}.f5_ ;;
203            }
204        }
205
206        view: sync__payload__events__f5_ {
207            [...]
208        }
209
210        For these nested views to get translated to the following joins, the names
211        need to be resolved backwards:
212
213        join: sync__payload__events {
214            relationship: one_to_many
215            sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;;
216        }
217
218        join: sync__payload__events__f5_ {
219            relationship: one_to_many
220            sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;;
221        }
222        """
223        split = view_name.split("__")
224        for index in range(len(split) - 1, 0, -1):
225            base_view = "__".join(split[:index])
226            metric = "__".join(split[index:])
227            if base_view in views:
228                return (base_view, metric)
229        raise Exception(f"Cannot get base name and metric from view {view_name}")
230
231    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
232        """Determine whether a this view has this dimension."""
233        for _view_defn in self.get_view_lookml(view)["views"]:
234            if _view_defn["name"] != view:
235                continue
236            for dim in _view_defn.get("dimensions", []):
237                if dim["name"] == dimension_name:
238                    return True
239        return False
240
241    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
242        """Get time partitiong dimension group for this view.
243
244        Return the name of the first dimension group tagged "time_partitioning_field",
245        and fall back to "submission" if available.
246        """
247        has_submission = False
248        for _view_defn in self.get_view_lookml(view)["views"]:
249            if not _view_defn["name"] == view:
250                continue
251            for dim in _view_defn.get("dimension_groups", []):
252                if "time_partitioning_field" in dim.get("tags", []):
253                    return dim["name"]
254                elif dim["name"] == "submission":
255                    has_submission = True
256        if has_submission:
257            return "submission"
258        return None
259
260    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
261        """Get required filters for this view."""
262        filters = []
263        view = self.views[view_name]
264
265        # Add a default filter on channel, if it's present in the view
266        default_channel = self._get_default_channel(view)
267        if default_channel is not None:
268            filters.append({"channel": default_channel})
269
270        # Add submission filter, if present in the view
271        if time_partitioning_group := self.get_view_time_partitioning_group(view):
272            filters.append({f"{time_partitioning_group}_date": "28 days"})
273
274        return filters
275
276    def __eq__(self, other) -> bool:
277        """Check for equality with other View."""
278
279        def comparable_dict(d):
280            return tuple(sorted(d.items()))
281
282        if isinstance(other, Explore):
283            return (
284                self.name == other.name
285                and comparable_dict(self.views) == comparable_dict(other.views)
286                and self.type == other.type
287            )
288        return False
@dataclass
class Explore:
 15@dataclass
 16class Explore:
 17    """A generic explore."""
 18
 19    name: str
 20    views: Dict[str, str]
 21    views_path: Optional[Path] = None
 22    defn: Optional[Dict[str, str]] = None
 23    type: str = field(init=False)
 24
 25    def to_dict(self) -> dict:
 26        """Explore instance represented as a dict."""
 27        return {self.name: {"type": self.type, "views": self.views}}
 28
 29    def to_lookml(
 30        self, v1_name: Optional[str], hidden: Optional[bool]
 31    ) -> List[Dict[str, Any]]:
 32        """
 33        Generate LookML for this explore.
 34
 35        Any generation done in dependent explore's
 36        `_to_lookml` takes precedence over these fields.
 37        """
 38        base_lookml = {}
 39        if hidden:
 40            base_lookml["hidden"] = "yes"
 41        base_view_name = next(
 42            (
 43                view_name
 44                for view_type, view_name in self.views.items()
 45                if view_type == "base_view"
 46            )
 47        )
 48        for view_type, view in self.views.items():
 49            # We look at our dependent views to see if they have a
 50            # "submission" field. Dependent views are any that are:
 51            # - base_view
 52            # - extended_view*
 53            #
 54            # We do not want to look at joined views. Those should be
 55            # labeled as:
 56            # - join*
 57            #
 58            # If they have a submission field, we filter on the date.
 59            # This allows for filter queries to succeed.
 60            if "join" in view_type:
 61                continue
 62            if time_partitioning_group := self.get_view_time_partitioning_group(view):
 63                base_lookml["sql_always_where"] = (
 64                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
 65                )
 66
 67        # We only update the first returned explore
 68        new_lookml = self._to_lookml(v1_name)
 69        base_lookml.update(new_lookml[0])
 70        new_lookml[0] = base_lookml
 71
 72        return new_lookml
 73
 74    def _to_lookml(
 75        self,
 76        v1_name: Optional[str],
 77    ) -> List[Dict[str, Any]]:
 78        raise NotImplementedError("Only implemented in subclasses")
 79
 80    def get_dependent_views(self) -> List[str]:
 81        """Get views this explore is dependent on."""
 82        dependent_views = []
 83        for _type, views in self.views.items():
 84            if _type.startswith("extended"):
 85                continue
 86            elif _type.startswith("joined"):
 87                dependent_views += [view for view in views]
 88            else:
 89                dependent_views.append(views)
 90        return dependent_views
 91
 92    @staticmethod
 93    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
 94        """Get an instance of an explore from a namespace definition."""
 95        raise NotImplementedError("Only implemented in subclasses")
 96
 97    def get_view_lookml(self, view: str) -> dict:
 98        """Get the LookML for a view."""
 99        if self.views_path is not None:
100            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
101        raise Exception("Missing view path for get_view_lookml")
102
103    def get_datagroup(self) -> Optional[str]:
104        """
105        Return the name of the associated datagroup.
106
107        Return `None` if there is no datagroup for this explore.
108        """
109        if self.views_path and (self.views_path.parent / "datagroups").exists():
110            datagroups_path = self.views_path.parent / "datagroups"
111            datagroup_file = (
112                datagroups_path
113                / f'{self.views["base_view"]}_last_updated.datagroup.lkml'
114            )
115            if datagroup_file.exists():
116                return f'{self.views["base_view"]}_last_updated'
117        return None
118
119    def get_unnested_fields_joins_lookml(
120        self,
121    ) -> list:
122        """Get the LookML for joining unnested fields."""
123        views_lookml = self.get_view_lookml(self.views["base_view"])
124        views: List[str] = [view["name"] for view in views_lookml["views"]]
125        parent_base_name = views_lookml["views"][0]["name"]
126
127        extended_views: List[str] = []
128        if "extended_view" in self.views:
129            # check for extended views
130            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
131            extended_views = [view["name"] for view in extended_views_lookml["views"]]
132
133            views_lookml.update(extended_views_lookml)
134            views += extended_views
135
136        joins = []
137        for view in views_lookml["views"][1:]:
138            view_name = view["name"]
139            # get repeated, nested fields that exist as separate views in lookml
140            base_name, metric = self._get_base_name_and_metric(
141                view_name=view_name, views=views
142            )
143            metric_name = view_name
144            metric_label = slug_to_title(metric_name)
145
146            if view_name in extended_views:
147                # names of extended views are overriden by the name of the view that is extending them
148                metric_label = slug_to_title(
149                    metric_name.replace(base_name, parent_base_name)
150                )
151                base_name = parent_base_name
152
153            joins.append(
154                {
155                    "name": view_name,
156                    "view_label": metric_label,
157                    "relationship": "one_to_many",
158                    "sql": (
159                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
160                    ),
161                }
162            )
163
164        return joins
165
166    def _get_default_channel(self, view: str) -> Optional[str]:
167        channel_params = [
168            param
169            for _view_defn in self.get_view_lookml(view)["views"]
170            for param in _view_defn.get("filters", [])
171            if _view_defn["name"] == view and param["name"] == "channel"
172        ]
173
174        if channel_params:
175            allowed_values = channel_params[0]["suggestions"]
176            default_value = allowed_values[0]
177            return escape_filter_expr(default_value)
178        return None
179
180    def _get_base_name_and_metric(
181        self, view_name: str, views: List[str]
182    ) -> Tuple[str, str]:
183        """
184        Get base view and metric names.
185
186        Returns the the name of the base view and the metric based on the
187        passed `view_name` and existing views.
188
189        The names are resolved in a backwards fashion to account for
190        repeated nested fields that might contain other nested fields.
191        For example:
192
193        view: sync {
194            [...]
195            dimension: payload__events {
196                sql: ${TABLE}.payload.events ;;
197            }
198        }
199
200        view: sync__payload__events {
201            [...]
202            dimension: f5_ {
203                sql: ${TABLE}.f5_ ;;
204            }
205        }
206
207        view: sync__payload__events__f5_ {
208            [...]
209        }
210
211        For these nested views to get translated to the following joins, the names
212        need to be resolved backwards:
213
214        join: sync__payload__events {
215            relationship: one_to_many
216            sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;;
217        }
218
219        join: sync__payload__events__f5_ {
220            relationship: one_to_many
221            sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;;
222        }
223        """
224        split = view_name.split("__")
225        for index in range(len(split) - 1, 0, -1):
226            base_view = "__".join(split[:index])
227            metric = "__".join(split[index:])
228            if base_view in views:
229                return (base_view, metric)
230        raise Exception(f"Cannot get base name and metric from view {view_name}")
231
232    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
233        """Determine whether a this view has this dimension."""
234        for _view_defn in self.get_view_lookml(view)["views"]:
235            if _view_defn["name"] != view:
236                continue
237            for dim in _view_defn.get("dimensions", []):
238                if dim["name"] == dimension_name:
239                    return True
240        return False
241
242    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
243        """Get time partitiong dimension group for this view.
244
245        Return the name of the first dimension group tagged "time_partitioning_field",
246        and fall back to "submission" if available.
247        """
248        has_submission = False
249        for _view_defn in self.get_view_lookml(view)["views"]:
250            if not _view_defn["name"] == view:
251                continue
252            for dim in _view_defn.get("dimension_groups", []):
253                if "time_partitioning_field" in dim.get("tags", []):
254                    return dim["name"]
255                elif dim["name"] == "submission":
256                    has_submission = True
257        if has_submission:
258            return "submission"
259        return None
260
261    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
262        """Get required filters for this view."""
263        filters = []
264        view = self.views[view_name]
265
266        # Add a default filter on channel, if it's present in the view
267        default_channel = self._get_default_channel(view)
268        if default_channel is not None:
269            filters.append({"channel": default_channel})
270
271        # Add submission filter, if present in the view
272        if time_partitioning_group := self.get_view_time_partitioning_group(view):
273            filters.append({f"{time_partitioning_group}_date": "28 days"})
274
275        return filters
276
277    def __eq__(self, other) -> bool:
278        """Check for equality with other View."""
279
280        def comparable_dict(d):
281            return tuple(sorted(d.items()))
282
283        if isinstance(other, Explore):
284            return (
285                self.name == other.name
286                and comparable_dict(self.views) == comparable_dict(other.views)
287                and self.type == other.type
288            )
289        return False

A generic explore.

Explore( name: str, views: Dict[str, str], views_path: Optional[pathlib.Path] = None, defn: Optional[Dict[str, str]] = None)
name: str
views: Dict[str, str]
views_path: Optional[pathlib.Path] = None
defn: Optional[Dict[str, str]] = None
type: str
def to_dict(self) -> dict:
25    def to_dict(self) -> dict:
26        """Explore instance represented as a dict."""
27        return {self.name: {"type": self.type, "views": self.views}}

Explore instance represented as a dict.

def to_lookml( self, v1_name: Optional[str], hidden: Optional[bool]) -> List[Dict[str, Any]]:
29    def to_lookml(
30        self, v1_name: Optional[str], hidden: Optional[bool]
31    ) -> List[Dict[str, Any]]:
32        """
33        Generate LookML for this explore.
34
35        Any generation done in dependent explore's
36        `_to_lookml` takes precedence over these fields.
37        """
38        base_lookml = {}
39        if hidden:
40            base_lookml["hidden"] = "yes"
41        base_view_name = next(
42            (
43                view_name
44                for view_type, view_name in self.views.items()
45                if view_type == "base_view"
46            )
47        )
48        for view_type, view in self.views.items():
49            # We look at our dependent views to see if they have a
50            # "submission" field. Dependent views are any that are:
51            # - base_view
52            # - extended_view*
53            #
54            # We do not want to look at joined views. Those should be
55            # labeled as:
56            # - join*
57            #
58            # If they have a submission field, we filter on the date.
59            # This allows for filter queries to succeed.
60            if "join" in view_type:
61                continue
62            if time_partitioning_group := self.get_view_time_partitioning_group(view):
63                base_lookml["sql_always_where"] = (
64                    f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'"
65                )
66
67        # We only update the first returned explore
68        new_lookml = self._to_lookml(v1_name)
69        base_lookml.update(new_lookml[0])
70        new_lookml[0] = base_lookml
71
72        return new_lookml

Generate LookML for this explore.

Any generation done in dependent explore's _to_lookml takes precedence over these fields.

def get_dependent_views(self) -> List[str]:
80    def get_dependent_views(self) -> List[str]:
81        """Get views this explore is dependent on."""
82        dependent_views = []
83        for _type, views in self.views.items():
84            if _type.startswith("extended"):
85                continue
86            elif _type.startswith("joined"):
87                dependent_views += [view for view in views]
88            else:
89                dependent_views.append(views)
90        return dependent_views

Get views this explore is dependent on.

@staticmethod
def from_dict( name: str, defn: dict, views_path: pathlib.Path) -> Explore:
92    @staticmethod
93    def from_dict(name: str, defn: dict, views_path: Path) -> Explore:
94        """Get an instance of an explore from a namespace definition."""
95        raise NotImplementedError("Only implemented in subclasses")

Get an instance of an explore from a namespace definition.

def get_view_lookml(self, view: str) -> dict:
 97    def get_view_lookml(self, view: str) -> dict:
 98        """Get the LookML for a view."""
 99        if self.views_path is not None:
100            return lkml.load((self.views_path / f"{view}.view.lkml").read_text())
101        raise Exception("Missing view path for get_view_lookml")

Get the LookML for a view.

def get_datagroup(self) -> Optional[str]:
103    def get_datagroup(self) -> Optional[str]:
104        """
105        Return the name of the associated datagroup.
106
107        Return `None` if there is no datagroup for this explore.
108        """
109        if self.views_path and (self.views_path.parent / "datagroups").exists():
110            datagroups_path = self.views_path.parent / "datagroups"
111            datagroup_file = (
112                datagroups_path
113                / f'{self.views["base_view"]}_last_updated.datagroup.lkml'
114            )
115            if datagroup_file.exists():
116                return f'{self.views["base_view"]}_last_updated'
117        return None

Return the name of the associated datagroup.

Return None if there is no datagroup for this explore.

def get_unnested_fields_joins_lookml(self) -> list:
119    def get_unnested_fields_joins_lookml(
120        self,
121    ) -> list:
122        """Get the LookML for joining unnested fields."""
123        views_lookml = self.get_view_lookml(self.views["base_view"])
124        views: List[str] = [view["name"] for view in views_lookml["views"]]
125        parent_base_name = views_lookml["views"][0]["name"]
126
127        extended_views: List[str] = []
128        if "extended_view" in self.views:
129            # check for extended views
130            extended_views_lookml = self.get_view_lookml(self.views["extended_view"])
131            extended_views = [view["name"] for view in extended_views_lookml["views"]]
132
133            views_lookml.update(extended_views_lookml)
134            views += extended_views
135
136        joins = []
137        for view in views_lookml["views"][1:]:
138            view_name = view["name"]
139            # get repeated, nested fields that exist as separate views in lookml
140            base_name, metric = self._get_base_name_and_metric(
141                view_name=view_name, views=views
142            )
143            metric_name = view_name
144            metric_label = slug_to_title(metric_name)
145
146            if view_name in extended_views:
147                # names of extended views are overriden by the name of the view that is extending them
148                metric_label = slug_to_title(
149                    metric_name.replace(base_name, parent_base_name)
150                )
151                base_name = parent_base_name
152
153            joins.append(
154                {
155                    "name": view_name,
156                    "view_label": metric_label,
157                    "relationship": "one_to_many",
158                    "sql": (
159                        f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} "
160                    ),
161                }
162            )
163
164        return joins

Get the LookML for joining unnested fields.

def has_view_dimension(self, view: str, dimension_name: str) -> bool:
232    def has_view_dimension(self, view: str, dimension_name: str) -> bool:
233        """Determine whether a this view has this dimension."""
234        for _view_defn in self.get_view_lookml(view)["views"]:
235            if _view_defn["name"] != view:
236                continue
237            for dim in _view_defn.get("dimensions", []):
238                if dim["name"] == dimension_name:
239                    return True
240        return False

Determine whether a this view has this dimension.

def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
242    def get_view_time_partitioning_group(self, view: str) -> Optional[str]:
243        """Get time partitiong dimension group for this view.
244
245        Return the name of the first dimension group tagged "time_partitioning_field",
246        and fall back to "submission" if available.
247        """
248        has_submission = False
249        for _view_defn in self.get_view_lookml(view)["views"]:
250            if not _view_defn["name"] == view:
251                continue
252            for dim in _view_defn.get("dimension_groups", []):
253                if "time_partitioning_field" in dim.get("tags", []):
254                    return dim["name"]
255                elif dim["name"] == "submission":
256                    has_submission = True
257        if has_submission:
258            return "submission"
259        return None

Get time partitiong dimension group for this view.

Return the name of the first dimension group tagged "time_partitioning_field", and fall back to "submission" if available.

def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
261    def get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
262        """Get required filters for this view."""
263        filters = []
264        view = self.views[view_name]
265
266        # Add a default filter on channel, if it's present in the view
267        default_channel = self._get_default_channel(view)
268        if default_channel is not None:
269            filters.append({"channel": default_channel})
270
271        # Add submission filter, if present in the view
272        if time_partitioning_group := self.get_view_time_partitioning_group(view):
273            filters.append({f"{time_partitioning_group}_date": "28 days"})
274
275        return filters

Get required filters for this view.