generator.explores.explore
Generic explore type.
1"""Generic explore type.""" 2 3from __future__ import annotations 4 5from dataclasses import dataclass, field 6from pathlib import Path 7from typing import Any, Dict, List, Optional, Tuple 8 9import lkml 10 11from ..views.lookml_utils import escape_filter_expr, slug_to_title 12 13 14@dataclass 15class Explore: 16 """A generic explore.""" 17 18 name: str 19 views: Dict[str, str] 20 views_path: Optional[Path] = None 21 defn: Optional[Dict[str, str]] = None 22 type: str = field(init=False) 23 24 def to_dict(self) -> dict: 25 """Explore instance represented as a dict.""" 26 return {self.name: {"type": self.type, "views": self.views}} 27 28 def to_lookml( 29 self, v1_name: Optional[str], hidden: Optional[bool] 30 ) -> List[Dict[str, Any]]: 31 """ 32 Generate LookML for this explore. 33 34 Any generation done in dependent explore's 35 `_to_lookml` takes precedence over these fields. 36 """ 37 base_lookml = {} 38 if hidden: 39 base_lookml["hidden"] = "yes" 40 base_view_name = next( 41 ( 42 view_name 43 for view_type, view_name in self.views.items() 44 if view_type == "base_view" 45 ) 46 ) 47 for view_type, view in self.views.items(): 48 # We look at our dependent views to see if they have a 49 # "submission" field. Dependent views are any that are: 50 # - base_view 51 # - extended_view* 52 # 53 # We do not want to look at joined views. Those should be 54 # labeled as: 55 # - join* 56 # 57 # If they have a submission field, we filter on the date. 58 # This allows for filter queries to succeed. 59 if "join" in view_type: 60 continue 61 if time_partitioning_group := self.get_view_time_partitioning_group(view): 62 base_lookml["sql_always_where"] = ( 63 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 64 ) 65 66 # We only update the first returned explore 67 new_lookml = self._to_lookml(v1_name) 68 base_lookml.update(new_lookml[0]) 69 new_lookml[0] = base_lookml 70 71 return new_lookml 72 73 def _to_lookml( 74 self, 75 v1_name: Optional[str], 76 ) -> List[Dict[str, Any]]: 77 raise NotImplementedError("Only implemented in subclasses") 78 79 def get_dependent_views(self) -> List[str]: 80 """Get views this explore is dependent on.""" 81 dependent_views = [] 82 for _type, views in self.views.items(): 83 if _type.startswith("extended"): 84 continue 85 elif _type.startswith("joined"): 86 dependent_views += [view for view in views] 87 else: 88 dependent_views.append(views) 89 return dependent_views 90 91 @staticmethod 92 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 93 """Get an instance of an explore from a namespace definition.""" 94 raise NotImplementedError("Only implemented in subclasses") 95 96 def get_view_lookml(self, view: str) -> dict: 97 """Get the LookML for a view.""" 98 if self.views_path is not None: 99 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 100 101 raise Exception("Missing view path for get_view_lookml") 102 103 def get_datagroup(self) -> Optional[str]: 104 """ 105 Return the name of the associated datagroup. 106 107 Return `None` if there is no datagroup for this explore. 108 """ 109 if self.views_path and (self.views_path.parent / "datagroups").exists(): 110 datagroups_path = self.views_path.parent / "datagroups" 111 datagroup_file = ( 112 datagroups_path 113 / f'{self.views["base_view"]}_last_updated.datagroup.lkml' 114 ) 115 if datagroup_file.exists(): 116 return f'{self.views["base_view"]}_last_updated' 117 return None 118 119 def get_unnested_fields_joins_lookml( 120 self, 121 ) -> list: 122 """Get the LookML for joining unnested fields.""" 123 views_lookml = self.get_view_lookml(self.views["base_view"]) 124 views: List[str] = [view["name"] for view in views_lookml["views"]] 125 parent_base_name = views_lookml["views"][0]["name"] 126 127 extended_views: List[str] = [] 128 if "extended_view" in self.views: 129 # check for extended views 130 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 131 extended_views = [view["name"] for view in extended_views_lookml["views"]] 132 133 views_lookml.update(extended_views_lookml) 134 views += extended_views 135 136 joins = [] 137 for view in views_lookml["views"][1:]: 138 view_name = view["name"] 139 # get repeated, nested fields that exist as separate views in lookml 140 base_name, metric = self._get_base_name_and_metric( 141 view_name=view_name, views=views 142 ) 143 metric_name = view_name 144 metric_slug = metric_name 145 146 if view_name in extended_views: 147 # names of extended views are overriden by the name of the view that is extending them 148 metric_slug = metric_name.replace(base_name, parent_base_name) 149 base_name = parent_base_name 150 151 metric_label = ": ".join( 152 slug_to_title(slug) for slug in metric_slug.split("__") 153 ) 154 155 joins.append( 156 { 157 "name": view_name, 158 "view_label": metric_label, 159 "relationship": "one_to_many", 160 "sql": ( 161 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 162 ), 163 } 164 ) 165 166 return joins 167 168 def _get_default_channel(self, view: str) -> Optional[str]: 169 channel_params = [ 170 param 171 for _view_defn in self.get_view_lookml(view)["views"] 172 for param in _view_defn.get("filters", []) 173 if _view_defn["name"] == view and param["name"] == "channel" 174 ] 175 176 if channel_params: 177 allowed_values = channel_params[0]["suggestions"] 178 default_value = allowed_values[0] 179 return escape_filter_expr(default_value) 180 return None 181 182 def _get_base_name_and_metric( 183 self, view_name: str, views: List[str] 184 ) -> Tuple[str, str]: 185 """ 186 Get base view and metric names. 187 188 Returns the the name of the base view and the metric based on the 189 passed `view_name` and existing views. 190 191 The names are resolved in a backwards fashion to account for 192 repeated nested fields that might contain other nested fields. 193 For example: 194 195 view: sync { 196 [...] 197 dimension: payload__events { 198 sql: ${TABLE}.payload.events ;; 199 } 200 } 201 202 view: sync__payload__events { 203 [...] 204 dimension: f5_ { 205 sql: ${TABLE}.f5_ ;; 206 } 207 } 208 209 view: sync__payload__events__f5_ { 210 [...] 211 } 212 213 For these nested views to get translated to the following joins, the names 214 need to be resolved backwards: 215 216 join: sync__payload__events { 217 relationship: one_to_many 218 sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;; 219 } 220 221 join: sync__payload__events__f5_ { 222 relationship: one_to_many 223 sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;; 224 } 225 """ 226 split = view_name.split("__") 227 for index in range(len(split) - 1, 0, -1): 228 base_view = "__".join(split[:index]) 229 metric = "__".join(split[index:]) 230 if base_view in views: 231 return (base_view, metric) 232 raise Exception(f"Cannot get base name and metric from view {view_name}") 233 234 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 235 """Determine whether a this view has this dimension.""" 236 for _view_defn in self.get_view_lookml(view)["views"]: 237 if _view_defn["name"] != view: 238 continue 239 for dim in _view_defn.get("dimensions", []): 240 if dim["name"] == dimension_name: 241 return True 242 return False 243 244 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 245 """Get time partitiong dimension group for this view. 246 247 Return the name of the first dimension group tagged "time_partitioning_field", 248 and fall back to "submission" if available. 249 """ 250 has_submission = False 251 for _view_defn in self.get_view_lookml(view)["views"]: 252 if not _view_defn["name"] == view: 253 continue 254 for dim in _view_defn.get("dimension_groups", []): 255 if "time_partitioning_field" in dim.get("tags", []): 256 return dim["name"] 257 elif dim["name"] == "submission": 258 has_submission = True 259 if has_submission: 260 return "submission" 261 return None 262 263 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 264 """Get required filters for this view.""" 265 filters = [] 266 view = self.views[view_name] 267 268 # Add a default filter on channel, if it's present in the view 269 default_channel = self._get_default_channel(view) 270 if default_channel is not None: 271 filters.append({"channel": default_channel}) 272 273 # Add submission filter, if present in the view 274 if time_partitioning_group := self.get_view_time_partitioning_group(view): 275 filters.append({f"{time_partitioning_group}_date": "28 days"}) 276 277 return filters 278 279 def __eq__(self, other) -> bool: 280 """Check for equality with other View.""" 281 282 def comparable_dict(d): 283 return tuple(sorted(d.items())) 284 285 if isinstance(other, Explore): 286 return ( 287 self.name == other.name 288 and comparable_dict(self.views) == comparable_dict(other.views) 289 and self.type == other.type 290 ) 291 return False
@dataclass
class
Explore:
15@dataclass 16class Explore: 17 """A generic explore.""" 18 19 name: str 20 views: Dict[str, str] 21 views_path: Optional[Path] = None 22 defn: Optional[Dict[str, str]] = None 23 type: str = field(init=False) 24 25 def to_dict(self) -> dict: 26 """Explore instance represented as a dict.""" 27 return {self.name: {"type": self.type, "views": self.views}} 28 29 def to_lookml( 30 self, v1_name: Optional[str], hidden: Optional[bool] 31 ) -> List[Dict[str, Any]]: 32 """ 33 Generate LookML for this explore. 34 35 Any generation done in dependent explore's 36 `_to_lookml` takes precedence over these fields. 37 """ 38 base_lookml = {} 39 if hidden: 40 base_lookml["hidden"] = "yes" 41 base_view_name = next( 42 ( 43 view_name 44 for view_type, view_name in self.views.items() 45 if view_type == "base_view" 46 ) 47 ) 48 for view_type, view in self.views.items(): 49 # We look at our dependent views to see if they have a 50 # "submission" field. Dependent views are any that are: 51 # - base_view 52 # - extended_view* 53 # 54 # We do not want to look at joined views. Those should be 55 # labeled as: 56 # - join* 57 # 58 # If they have a submission field, we filter on the date. 59 # This allows for filter queries to succeed. 60 if "join" in view_type: 61 continue 62 if time_partitioning_group := self.get_view_time_partitioning_group(view): 63 base_lookml["sql_always_where"] = ( 64 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 65 ) 66 67 # We only update the first returned explore 68 new_lookml = self._to_lookml(v1_name) 69 base_lookml.update(new_lookml[0]) 70 new_lookml[0] = base_lookml 71 72 return new_lookml 73 74 def _to_lookml( 75 self, 76 v1_name: Optional[str], 77 ) -> List[Dict[str, Any]]: 78 raise NotImplementedError("Only implemented in subclasses") 79 80 def get_dependent_views(self) -> List[str]: 81 """Get views this explore is dependent on.""" 82 dependent_views = [] 83 for _type, views in self.views.items(): 84 if _type.startswith("extended"): 85 continue 86 elif _type.startswith("joined"): 87 dependent_views += [view for view in views] 88 else: 89 dependent_views.append(views) 90 return dependent_views 91 92 @staticmethod 93 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 94 """Get an instance of an explore from a namespace definition.""" 95 raise NotImplementedError("Only implemented in subclasses") 96 97 def get_view_lookml(self, view: str) -> dict: 98 """Get the LookML for a view.""" 99 if self.views_path is not None: 100 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 101 102 raise Exception("Missing view path for get_view_lookml") 103 104 def get_datagroup(self) -> Optional[str]: 105 """ 106 Return the name of the associated datagroup. 107 108 Return `None` if there is no datagroup for this explore. 109 """ 110 if self.views_path and (self.views_path.parent / "datagroups").exists(): 111 datagroups_path = self.views_path.parent / "datagroups" 112 datagroup_file = ( 113 datagroups_path 114 / f'{self.views["base_view"]}_last_updated.datagroup.lkml' 115 ) 116 if datagroup_file.exists(): 117 return f'{self.views["base_view"]}_last_updated' 118 return None 119 120 def get_unnested_fields_joins_lookml( 121 self, 122 ) -> list: 123 """Get the LookML for joining unnested fields.""" 124 views_lookml = self.get_view_lookml(self.views["base_view"]) 125 views: List[str] = [view["name"] for view in views_lookml["views"]] 126 parent_base_name = views_lookml["views"][0]["name"] 127 128 extended_views: List[str] = [] 129 if "extended_view" in self.views: 130 # check for extended views 131 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 132 extended_views = [view["name"] for view in extended_views_lookml["views"]] 133 134 views_lookml.update(extended_views_lookml) 135 views += extended_views 136 137 joins = [] 138 for view in views_lookml["views"][1:]: 139 view_name = view["name"] 140 # get repeated, nested fields that exist as separate views in lookml 141 base_name, metric = self._get_base_name_and_metric( 142 view_name=view_name, views=views 143 ) 144 metric_name = view_name 145 metric_slug = metric_name 146 147 if view_name in extended_views: 148 # names of extended views are overriden by the name of the view that is extending them 149 metric_slug = metric_name.replace(base_name, parent_base_name) 150 base_name = parent_base_name 151 152 metric_label = ": ".join( 153 slug_to_title(slug) for slug in metric_slug.split("__") 154 ) 155 156 joins.append( 157 { 158 "name": view_name, 159 "view_label": metric_label, 160 "relationship": "one_to_many", 161 "sql": ( 162 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 163 ), 164 } 165 ) 166 167 return joins 168 169 def _get_default_channel(self, view: str) -> Optional[str]: 170 channel_params = [ 171 param 172 for _view_defn in self.get_view_lookml(view)["views"] 173 for param in _view_defn.get("filters", []) 174 if _view_defn["name"] == view and param["name"] == "channel" 175 ] 176 177 if channel_params: 178 allowed_values = channel_params[0]["suggestions"] 179 default_value = allowed_values[0] 180 return escape_filter_expr(default_value) 181 return None 182 183 def _get_base_name_and_metric( 184 self, view_name: str, views: List[str] 185 ) -> Tuple[str, str]: 186 """ 187 Get base view and metric names. 188 189 Returns the the name of the base view and the metric based on the 190 passed `view_name` and existing views. 191 192 The names are resolved in a backwards fashion to account for 193 repeated nested fields that might contain other nested fields. 194 For example: 195 196 view: sync { 197 [...] 198 dimension: payload__events { 199 sql: ${TABLE}.payload.events ;; 200 } 201 } 202 203 view: sync__payload__events { 204 [...] 205 dimension: f5_ { 206 sql: ${TABLE}.f5_ ;; 207 } 208 } 209 210 view: sync__payload__events__f5_ { 211 [...] 212 } 213 214 For these nested views to get translated to the following joins, the names 215 need to be resolved backwards: 216 217 join: sync__payload__events { 218 relationship: one_to_many 219 sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;; 220 } 221 222 join: sync__payload__events__f5_ { 223 relationship: one_to_many 224 sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;; 225 } 226 """ 227 split = view_name.split("__") 228 for index in range(len(split) - 1, 0, -1): 229 base_view = "__".join(split[:index]) 230 metric = "__".join(split[index:]) 231 if base_view in views: 232 return (base_view, metric) 233 raise Exception(f"Cannot get base name and metric from view {view_name}") 234 235 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 236 """Determine whether a this view has this dimension.""" 237 for _view_defn in self.get_view_lookml(view)["views"]: 238 if _view_defn["name"] != view: 239 continue 240 for dim in _view_defn.get("dimensions", []): 241 if dim["name"] == dimension_name: 242 return True 243 return False 244 245 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 246 """Get time partitiong dimension group for this view. 247 248 Return the name of the first dimension group tagged "time_partitioning_field", 249 and fall back to "submission" if available. 250 """ 251 has_submission = False 252 for _view_defn in self.get_view_lookml(view)["views"]: 253 if not _view_defn["name"] == view: 254 continue 255 for dim in _view_defn.get("dimension_groups", []): 256 if "time_partitioning_field" in dim.get("tags", []): 257 return dim["name"] 258 elif dim["name"] == "submission": 259 has_submission = True 260 if has_submission: 261 return "submission" 262 return None 263 264 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 265 """Get required filters for this view.""" 266 filters = [] 267 view = self.views[view_name] 268 269 # Add a default filter on channel, if it's present in the view 270 default_channel = self._get_default_channel(view) 271 if default_channel is not None: 272 filters.append({"channel": default_channel}) 273 274 # Add submission filter, if present in the view 275 if time_partitioning_group := self.get_view_time_partitioning_group(view): 276 filters.append({f"{time_partitioning_group}_date": "28 days"}) 277 278 return filters 279 280 def __eq__(self, other) -> bool: 281 """Check for equality with other View.""" 282 283 def comparable_dict(d): 284 return tuple(sorted(d.items())) 285 286 if isinstance(other, Explore): 287 return ( 288 self.name == other.name 289 and comparable_dict(self.views) == comparable_dict(other.views) 290 and self.type == other.type 291 ) 292 return False
A generic explore.
Explore( name: str, views: Dict[str, str], views_path: Optional[pathlib.Path] = None, defn: Optional[Dict[str, str]] = None)
def
to_dict(self) -> dict:
25 def to_dict(self) -> dict: 26 """Explore instance represented as a dict.""" 27 return {self.name: {"type": self.type, "views": self.views}}
Explore instance represented as a dict.
def
to_lookml( self, v1_name: Optional[str], hidden: Optional[bool]) -> List[Dict[str, Any]]:
29 def to_lookml( 30 self, v1_name: Optional[str], hidden: Optional[bool] 31 ) -> List[Dict[str, Any]]: 32 """ 33 Generate LookML for this explore. 34 35 Any generation done in dependent explore's 36 `_to_lookml` takes precedence over these fields. 37 """ 38 base_lookml = {} 39 if hidden: 40 base_lookml["hidden"] = "yes" 41 base_view_name = next( 42 ( 43 view_name 44 for view_type, view_name in self.views.items() 45 if view_type == "base_view" 46 ) 47 ) 48 for view_type, view in self.views.items(): 49 # We look at our dependent views to see if they have a 50 # "submission" field. Dependent views are any that are: 51 # - base_view 52 # - extended_view* 53 # 54 # We do not want to look at joined views. Those should be 55 # labeled as: 56 # - join* 57 # 58 # If they have a submission field, we filter on the date. 59 # This allows for filter queries to succeed. 60 if "join" in view_type: 61 continue 62 if time_partitioning_group := self.get_view_time_partitioning_group(view): 63 base_lookml["sql_always_where"] = ( 64 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 65 ) 66 67 # We only update the first returned explore 68 new_lookml = self._to_lookml(v1_name) 69 base_lookml.update(new_lookml[0]) 70 new_lookml[0] = base_lookml 71 72 return new_lookml
Generate LookML for this explore.
Any generation done in dependent explore's
_to_lookml takes precedence over these fields.
def
get_dependent_views(self) -> List[str]:
80 def get_dependent_views(self) -> List[str]: 81 """Get views this explore is dependent on.""" 82 dependent_views = [] 83 for _type, views in self.views.items(): 84 if _type.startswith("extended"): 85 continue 86 elif _type.startswith("joined"): 87 dependent_views += [view for view in views] 88 else: 89 dependent_views.append(views) 90 return dependent_views
Get views this explore is dependent on.
92 @staticmethod 93 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 94 """Get an instance of an explore from a namespace definition.""" 95 raise NotImplementedError("Only implemented in subclasses")
Get an instance of an explore from a namespace definition.
def
get_view_lookml(self, view: str) -> dict:
97 def get_view_lookml(self, view: str) -> dict: 98 """Get the LookML for a view.""" 99 if self.views_path is not None: 100 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 101 102 raise Exception("Missing view path for get_view_lookml")
Get the LookML for a view.
def
get_datagroup(self) -> Optional[str]:
104 def get_datagroup(self) -> Optional[str]: 105 """ 106 Return the name of the associated datagroup. 107 108 Return `None` if there is no datagroup for this explore. 109 """ 110 if self.views_path and (self.views_path.parent / "datagroups").exists(): 111 datagroups_path = self.views_path.parent / "datagroups" 112 datagroup_file = ( 113 datagroups_path 114 / f'{self.views["base_view"]}_last_updated.datagroup.lkml' 115 ) 116 if datagroup_file.exists(): 117 return f'{self.views["base_view"]}_last_updated' 118 return None
Return the name of the associated datagroup.
Return None if there is no datagroup for this explore.
def
get_unnested_fields_joins_lookml(self) -> list:
120 def get_unnested_fields_joins_lookml( 121 self, 122 ) -> list: 123 """Get the LookML for joining unnested fields.""" 124 views_lookml = self.get_view_lookml(self.views["base_view"]) 125 views: List[str] = [view["name"] for view in views_lookml["views"]] 126 parent_base_name = views_lookml["views"][0]["name"] 127 128 extended_views: List[str] = [] 129 if "extended_view" in self.views: 130 # check for extended views 131 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 132 extended_views = [view["name"] for view in extended_views_lookml["views"]] 133 134 views_lookml.update(extended_views_lookml) 135 views += extended_views 136 137 joins = [] 138 for view in views_lookml["views"][1:]: 139 view_name = view["name"] 140 # get repeated, nested fields that exist as separate views in lookml 141 base_name, metric = self._get_base_name_and_metric( 142 view_name=view_name, views=views 143 ) 144 metric_name = view_name 145 metric_slug = metric_name 146 147 if view_name in extended_views: 148 # names of extended views are overriden by the name of the view that is extending them 149 metric_slug = metric_name.replace(base_name, parent_base_name) 150 base_name = parent_base_name 151 152 metric_label = ": ".join( 153 slug_to_title(slug) for slug in metric_slug.split("__") 154 ) 155 156 joins.append( 157 { 158 "name": view_name, 159 "view_label": metric_label, 160 "relationship": "one_to_many", 161 "sql": ( 162 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 163 ), 164 } 165 ) 166 167 return joins
Get the LookML for joining unnested fields.
def
has_view_dimension(self, view: str, dimension_name: str) -> bool:
235 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 236 """Determine whether a this view has this dimension.""" 237 for _view_defn in self.get_view_lookml(view)["views"]: 238 if _view_defn["name"] != view: 239 continue 240 for dim in _view_defn.get("dimensions", []): 241 if dim["name"] == dimension_name: 242 return True 243 return False
Determine whether a this view has this dimension.
def
get_view_time_partitioning_group(self, view: str) -> Optional[str]:
245 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 246 """Get time partitiong dimension group for this view. 247 248 Return the name of the first dimension group tagged "time_partitioning_field", 249 and fall back to "submission" if available. 250 """ 251 has_submission = False 252 for _view_defn in self.get_view_lookml(view)["views"]: 253 if not _view_defn["name"] == view: 254 continue 255 for dim in _view_defn.get("dimension_groups", []): 256 if "time_partitioning_field" in dim.get("tags", []): 257 return dim["name"] 258 elif dim["name"] == "submission": 259 has_submission = True 260 if has_submission: 261 return "submission" 262 return None
Get time partitiong dimension group for this view.
Return the name of the first dimension group tagged "time_partitioning_field", and fall back to "submission" if available.
def
get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
264 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 265 """Get required filters for this view.""" 266 filters = [] 267 view = self.views[view_name] 268 269 # Add a default filter on channel, if it's present in the view 270 default_channel = self._get_default_channel(view) 271 if default_channel is not None: 272 filters.append({"channel": default_channel}) 273 274 # Add submission filter, if present in the view 275 if time_partitioning_group := self.get_view_time_partitioning_group(view): 276 filters.append({f"{time_partitioning_group}_date": "28 days"}) 277 278 return filters
Get required filters for this view.