generator.explores.explore
Generic explore type.
1"""Generic explore type.""" 2 3from __future__ import annotations 4 5from dataclasses import dataclass, field 6from pathlib import Path 7from typing import Any, Dict, List, Optional, Tuple 8 9import lkml 10 11from ..views.lookml_utils import escape_filter_expr, slug_to_title 12 13 14@dataclass 15class Explore: 16 """A generic explore.""" 17 18 name: str 19 views: Dict[str, str] 20 views_path: Optional[Path] = None 21 defn: Optional[Dict[str, str]] = None 22 type: str = field(init=False) 23 24 def to_dict(self) -> dict: 25 """Explore instance represented as a dict.""" 26 return {self.name: {"type": self.type, "views": self.views}} 27 28 def to_lookml( 29 self, v1_name: Optional[str], hidden: Optional[bool] 30 ) -> List[Dict[str, Any]]: 31 """ 32 Generate LookML for this explore. 33 34 Any generation done in dependent explore's 35 `_to_lookml` takes precedence over these fields. 36 """ 37 base_lookml = {} 38 if hidden: 39 base_lookml["hidden"] = "yes" 40 base_view_name = next( 41 ( 42 view_name 43 for view_type, view_name in self.views.items() 44 if view_type == "base_view" 45 ) 46 ) 47 for view_type, view in self.views.items(): 48 # We look at our dependent views to see if they have a 49 # "submission" field. Dependent views are any that are: 50 # - base_view 51 # - extended_view* 52 # 53 # We do not want to look at joined views. Those should be 54 # labeled as: 55 # - join* 56 # 57 # If they have a submission field, we filter on the date. 58 # This allows for filter queries to succeed. 59 if "join" in view_type: 60 continue 61 if time_partitioning_group := self.get_view_time_partitioning_group(view): 62 base_lookml["sql_always_where"] = ( 63 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 64 ) 65 66 # We only update the first returned explore 67 new_lookml = self._to_lookml(v1_name) 68 base_lookml.update(new_lookml[0]) 69 new_lookml[0] = base_lookml 70 71 return new_lookml 72 73 def _to_lookml( 74 self, 75 v1_name: Optional[str], 76 ) -> List[Dict[str, Any]]: 77 raise NotImplementedError("Only implemented in subclasses") 78 79 def get_dependent_views(self) -> List[str]: 80 """Get views this explore is dependent on.""" 81 dependent_views = [] 82 for _type, views in self.views.items(): 83 if _type.startswith("extended"): 84 continue 85 elif _type.startswith("joined"): 86 dependent_views += [view for view in views] 87 else: 88 dependent_views.append(views) 89 return dependent_views 90 91 @staticmethod 92 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 93 """Get an instance of an explore from a namespace definition.""" 94 raise NotImplementedError("Only implemented in subclasses") 95 96 def get_view_lookml(self, view: str) -> dict: 97 """Get the LookML for a view.""" 98 if self.views_path is not None: 99 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 100 raise Exception("Missing view path for get_view_lookml") 101 102 def get_datagroup(self) -> Optional[str]: 103 """ 104 Return the name of the associated datagroup. 105 106 Return `None` if there is no datagroup for this explore. 107 """ 108 if self.views_path and (self.views_path.parent / "datagroups").exists(): 109 datagroups_path = self.views_path.parent / "datagroups" 110 datagroup_file = ( 111 datagroups_path 112 / f'{self.views["base_view"]}_last_updated.datagroup.lkml' 113 ) 114 if datagroup_file.exists(): 115 return f'{self.views["base_view"]}_last_updated' 116 return None 117 118 def get_unnested_fields_joins_lookml( 119 self, 120 ) -> list: 121 """Get the LookML for joining unnested fields.""" 122 views_lookml = self.get_view_lookml(self.views["base_view"]) 123 views: List[str] = [view["name"] for view in views_lookml["views"]] 124 parent_base_name = views_lookml["views"][0]["name"] 125 126 extended_views: List[str] = [] 127 if "extended_view" in self.views: 128 # check for extended views 129 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 130 extended_views = [view["name"] for view in extended_views_lookml["views"]] 131 132 views_lookml.update(extended_views_lookml) 133 views += extended_views 134 135 joins = [] 136 for view in views_lookml["views"][1:]: 137 view_name = view["name"] 138 # get repeated, nested fields that exist as separate views in lookml 139 base_name, metric = self._get_base_name_and_metric( 140 view_name=view_name, views=views 141 ) 142 metric_name = view_name 143 metric_label = slug_to_title(metric_name) 144 145 if view_name in extended_views: 146 # names of extended views are overriden by the name of the view that is extending them 147 metric_label = slug_to_title( 148 metric_name.replace(base_name, parent_base_name) 149 ) 150 base_name = parent_base_name 151 152 joins.append( 153 { 154 "name": view_name, 155 "view_label": metric_label, 156 "relationship": "one_to_many", 157 "sql": ( 158 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 159 ), 160 } 161 ) 162 163 return joins 164 165 def _get_default_channel(self, view: str) -> Optional[str]: 166 channel_params = [ 167 param 168 for _view_defn in self.get_view_lookml(view)["views"] 169 for param in _view_defn.get("filters", []) 170 if _view_defn["name"] == view and param["name"] == "channel" 171 ] 172 173 if channel_params: 174 allowed_values = channel_params[0]["suggestions"] 175 default_value = allowed_values[0] 176 return escape_filter_expr(default_value) 177 return None 178 179 def _get_base_name_and_metric( 180 self, view_name: str, views: List[str] 181 ) -> Tuple[str, str]: 182 """ 183 Get base view and metric names. 184 185 Returns the the name of the base view and the metric based on the 186 passed `view_name` and existing views. 187 188 The names are resolved in a backwards fashion to account for 189 repeated nested fields that might contain other nested fields. 190 For example: 191 192 view: sync { 193 [...] 194 dimension: payload__events { 195 sql: ${TABLE}.payload.events ;; 196 } 197 } 198 199 view: sync__payload__events { 200 [...] 201 dimension: f5_ { 202 sql: ${TABLE}.f5_ ;; 203 } 204 } 205 206 view: sync__payload__events__f5_ { 207 [...] 208 } 209 210 For these nested views to get translated to the following joins, the names 211 need to be resolved backwards: 212 213 join: sync__payload__events { 214 relationship: one_to_many 215 sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;; 216 } 217 218 join: sync__payload__events__f5_ { 219 relationship: one_to_many 220 sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;; 221 } 222 """ 223 split = view_name.split("__") 224 for index in range(len(split) - 1, 0, -1): 225 base_view = "__".join(split[:index]) 226 metric = "__".join(split[index:]) 227 if base_view in views: 228 return (base_view, metric) 229 raise Exception(f"Cannot get base name and metric from view {view_name}") 230 231 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 232 """Determine whether a this view has this dimension.""" 233 for _view_defn in self.get_view_lookml(view)["views"]: 234 if _view_defn["name"] != view: 235 continue 236 for dim in _view_defn.get("dimensions", []): 237 if dim["name"] == dimension_name: 238 return True 239 return False 240 241 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 242 """Get time partitiong dimension group for this view. 243 244 Return the name of the first dimension group tagged "time_partitioning_field", 245 and fall back to "submission" if available. 246 """ 247 has_submission = False 248 for _view_defn in self.get_view_lookml(view)["views"]: 249 if not _view_defn["name"] == view: 250 continue 251 for dim in _view_defn.get("dimension_groups", []): 252 if "time_partitioning_field" in dim.get("tags", []): 253 return dim["name"] 254 elif dim["name"] == "submission": 255 has_submission = True 256 if has_submission: 257 return "submission" 258 return None 259 260 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 261 """Get required filters for this view.""" 262 filters = [] 263 view = self.views[view_name] 264 265 # Add a default filter on channel, if it's present in the view 266 default_channel = self._get_default_channel(view) 267 if default_channel is not None: 268 filters.append({"channel": default_channel}) 269 270 # Add submission filter, if present in the view 271 if time_partitioning_group := self.get_view_time_partitioning_group(view): 272 filters.append({f"{time_partitioning_group}_date": "28 days"}) 273 274 return filters 275 276 def __eq__(self, other) -> bool: 277 """Check for equality with other View.""" 278 279 def comparable_dict(d): 280 return tuple(sorted(d.items())) 281 282 if isinstance(other, Explore): 283 return ( 284 self.name == other.name 285 and comparable_dict(self.views) == comparable_dict(other.views) 286 and self.type == other.type 287 ) 288 return False
@dataclass
class
Explore:
15@dataclass 16class Explore: 17 """A generic explore.""" 18 19 name: str 20 views: Dict[str, str] 21 views_path: Optional[Path] = None 22 defn: Optional[Dict[str, str]] = None 23 type: str = field(init=False) 24 25 def to_dict(self) -> dict: 26 """Explore instance represented as a dict.""" 27 return {self.name: {"type": self.type, "views": self.views}} 28 29 def to_lookml( 30 self, v1_name: Optional[str], hidden: Optional[bool] 31 ) -> List[Dict[str, Any]]: 32 """ 33 Generate LookML for this explore. 34 35 Any generation done in dependent explore's 36 `_to_lookml` takes precedence over these fields. 37 """ 38 base_lookml = {} 39 if hidden: 40 base_lookml["hidden"] = "yes" 41 base_view_name = next( 42 ( 43 view_name 44 for view_type, view_name in self.views.items() 45 if view_type == "base_view" 46 ) 47 ) 48 for view_type, view in self.views.items(): 49 # We look at our dependent views to see if they have a 50 # "submission" field. Dependent views are any that are: 51 # - base_view 52 # - extended_view* 53 # 54 # We do not want to look at joined views. Those should be 55 # labeled as: 56 # - join* 57 # 58 # If they have a submission field, we filter on the date. 59 # This allows for filter queries to succeed. 60 if "join" in view_type: 61 continue 62 if time_partitioning_group := self.get_view_time_partitioning_group(view): 63 base_lookml["sql_always_where"] = ( 64 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 65 ) 66 67 # We only update the first returned explore 68 new_lookml = self._to_lookml(v1_name) 69 base_lookml.update(new_lookml[0]) 70 new_lookml[0] = base_lookml 71 72 return new_lookml 73 74 def _to_lookml( 75 self, 76 v1_name: Optional[str], 77 ) -> List[Dict[str, Any]]: 78 raise NotImplementedError("Only implemented in subclasses") 79 80 def get_dependent_views(self) -> List[str]: 81 """Get views this explore is dependent on.""" 82 dependent_views = [] 83 for _type, views in self.views.items(): 84 if _type.startswith("extended"): 85 continue 86 elif _type.startswith("joined"): 87 dependent_views += [view for view in views] 88 else: 89 dependent_views.append(views) 90 return dependent_views 91 92 @staticmethod 93 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 94 """Get an instance of an explore from a namespace definition.""" 95 raise NotImplementedError("Only implemented in subclasses") 96 97 def get_view_lookml(self, view: str) -> dict: 98 """Get the LookML for a view.""" 99 if self.views_path is not None: 100 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 101 raise Exception("Missing view path for get_view_lookml") 102 103 def get_datagroup(self) -> Optional[str]: 104 """ 105 Return the name of the associated datagroup. 106 107 Return `None` if there is no datagroup for this explore. 108 """ 109 if self.views_path and (self.views_path.parent / "datagroups").exists(): 110 datagroups_path = self.views_path.parent / "datagroups" 111 datagroup_file = ( 112 datagroups_path 113 / f'{self.views["base_view"]}_last_updated.datagroup.lkml' 114 ) 115 if datagroup_file.exists(): 116 return f'{self.views["base_view"]}_last_updated' 117 return None 118 119 def get_unnested_fields_joins_lookml( 120 self, 121 ) -> list: 122 """Get the LookML for joining unnested fields.""" 123 views_lookml = self.get_view_lookml(self.views["base_view"]) 124 views: List[str] = [view["name"] for view in views_lookml["views"]] 125 parent_base_name = views_lookml["views"][0]["name"] 126 127 extended_views: List[str] = [] 128 if "extended_view" in self.views: 129 # check for extended views 130 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 131 extended_views = [view["name"] for view in extended_views_lookml["views"]] 132 133 views_lookml.update(extended_views_lookml) 134 views += extended_views 135 136 joins = [] 137 for view in views_lookml["views"][1:]: 138 view_name = view["name"] 139 # get repeated, nested fields that exist as separate views in lookml 140 base_name, metric = self._get_base_name_and_metric( 141 view_name=view_name, views=views 142 ) 143 metric_name = view_name 144 metric_label = slug_to_title(metric_name) 145 146 if view_name in extended_views: 147 # names of extended views are overriden by the name of the view that is extending them 148 metric_label = slug_to_title( 149 metric_name.replace(base_name, parent_base_name) 150 ) 151 base_name = parent_base_name 152 153 joins.append( 154 { 155 "name": view_name, 156 "view_label": metric_label, 157 "relationship": "one_to_many", 158 "sql": ( 159 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 160 ), 161 } 162 ) 163 164 return joins 165 166 def _get_default_channel(self, view: str) -> Optional[str]: 167 channel_params = [ 168 param 169 for _view_defn in self.get_view_lookml(view)["views"] 170 for param in _view_defn.get("filters", []) 171 if _view_defn["name"] == view and param["name"] == "channel" 172 ] 173 174 if channel_params: 175 allowed_values = channel_params[0]["suggestions"] 176 default_value = allowed_values[0] 177 return escape_filter_expr(default_value) 178 return None 179 180 def _get_base_name_and_metric( 181 self, view_name: str, views: List[str] 182 ) -> Tuple[str, str]: 183 """ 184 Get base view and metric names. 185 186 Returns the the name of the base view and the metric based on the 187 passed `view_name` and existing views. 188 189 The names are resolved in a backwards fashion to account for 190 repeated nested fields that might contain other nested fields. 191 For example: 192 193 view: sync { 194 [...] 195 dimension: payload__events { 196 sql: ${TABLE}.payload.events ;; 197 } 198 } 199 200 view: sync__payload__events { 201 [...] 202 dimension: f5_ { 203 sql: ${TABLE}.f5_ ;; 204 } 205 } 206 207 view: sync__payload__events__f5_ { 208 [...] 209 } 210 211 For these nested views to get translated to the following joins, the names 212 need to be resolved backwards: 213 214 join: sync__payload__events { 215 relationship: one_to_many 216 sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;; 217 } 218 219 join: sync__payload__events__f5_ { 220 relationship: one_to_many 221 sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;; 222 } 223 """ 224 split = view_name.split("__") 225 for index in range(len(split) - 1, 0, -1): 226 base_view = "__".join(split[:index]) 227 metric = "__".join(split[index:]) 228 if base_view in views: 229 return (base_view, metric) 230 raise Exception(f"Cannot get base name and metric from view {view_name}") 231 232 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 233 """Determine whether a this view has this dimension.""" 234 for _view_defn in self.get_view_lookml(view)["views"]: 235 if _view_defn["name"] != view: 236 continue 237 for dim in _view_defn.get("dimensions", []): 238 if dim["name"] == dimension_name: 239 return True 240 return False 241 242 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 243 """Get time partitiong dimension group for this view. 244 245 Return the name of the first dimension group tagged "time_partitioning_field", 246 and fall back to "submission" if available. 247 """ 248 has_submission = False 249 for _view_defn in self.get_view_lookml(view)["views"]: 250 if not _view_defn["name"] == view: 251 continue 252 for dim in _view_defn.get("dimension_groups", []): 253 if "time_partitioning_field" in dim.get("tags", []): 254 return dim["name"] 255 elif dim["name"] == "submission": 256 has_submission = True 257 if has_submission: 258 return "submission" 259 return None 260 261 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 262 """Get required filters for this view.""" 263 filters = [] 264 view = self.views[view_name] 265 266 # Add a default filter on channel, if it's present in the view 267 default_channel = self._get_default_channel(view) 268 if default_channel is not None: 269 filters.append({"channel": default_channel}) 270 271 # Add submission filter, if present in the view 272 if time_partitioning_group := self.get_view_time_partitioning_group(view): 273 filters.append({f"{time_partitioning_group}_date": "28 days"}) 274 275 return filters 276 277 def __eq__(self, other) -> bool: 278 """Check for equality with other View.""" 279 280 def comparable_dict(d): 281 return tuple(sorted(d.items())) 282 283 if isinstance(other, Explore): 284 return ( 285 self.name == other.name 286 and comparable_dict(self.views) == comparable_dict(other.views) 287 and self.type == other.type 288 ) 289 return False
A generic explore.
Explore( name: str, views: Dict[str, str], views_path: Optional[pathlib.Path] = None, defn: Optional[Dict[str, str]] = None)
def
to_dict(self) -> dict:
25 def to_dict(self) -> dict: 26 """Explore instance represented as a dict.""" 27 return {self.name: {"type": self.type, "views": self.views}}
Explore instance represented as a dict.
def
to_lookml( self, v1_name: Optional[str], hidden: Optional[bool]) -> List[Dict[str, Any]]:
29 def to_lookml( 30 self, v1_name: Optional[str], hidden: Optional[bool] 31 ) -> List[Dict[str, Any]]: 32 """ 33 Generate LookML for this explore. 34 35 Any generation done in dependent explore's 36 `_to_lookml` takes precedence over these fields. 37 """ 38 base_lookml = {} 39 if hidden: 40 base_lookml["hidden"] = "yes" 41 base_view_name = next( 42 ( 43 view_name 44 for view_type, view_name in self.views.items() 45 if view_type == "base_view" 46 ) 47 ) 48 for view_type, view in self.views.items(): 49 # We look at our dependent views to see if they have a 50 # "submission" field. Dependent views are any that are: 51 # - base_view 52 # - extended_view* 53 # 54 # We do not want to look at joined views. Those should be 55 # labeled as: 56 # - join* 57 # 58 # If they have a submission field, we filter on the date. 59 # This allows for filter queries to succeed. 60 if "join" in view_type: 61 continue 62 if time_partitioning_group := self.get_view_time_partitioning_group(view): 63 base_lookml["sql_always_where"] = ( 64 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 65 ) 66 67 # We only update the first returned explore 68 new_lookml = self._to_lookml(v1_name) 69 base_lookml.update(new_lookml[0]) 70 new_lookml[0] = base_lookml 71 72 return new_lookml
Generate LookML for this explore.
Any generation done in dependent explore's
_to_lookml
takes precedence over these fields.
def
get_dependent_views(self) -> List[str]:
80 def get_dependent_views(self) -> List[str]: 81 """Get views this explore is dependent on.""" 82 dependent_views = [] 83 for _type, views in self.views.items(): 84 if _type.startswith("extended"): 85 continue 86 elif _type.startswith("joined"): 87 dependent_views += [view for view in views] 88 else: 89 dependent_views.append(views) 90 return dependent_views
Get views this explore is dependent on.
92 @staticmethod 93 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 94 """Get an instance of an explore from a namespace definition.""" 95 raise NotImplementedError("Only implemented in subclasses")
Get an instance of an explore from a namespace definition.
def
get_view_lookml(self, view: str) -> dict:
97 def get_view_lookml(self, view: str) -> dict: 98 """Get the LookML for a view.""" 99 if self.views_path is not None: 100 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 101 raise Exception("Missing view path for get_view_lookml")
Get the LookML for a view.
def
get_datagroup(self) -> Optional[str]:
103 def get_datagroup(self) -> Optional[str]: 104 """ 105 Return the name of the associated datagroup. 106 107 Return `None` if there is no datagroup for this explore. 108 """ 109 if self.views_path and (self.views_path.parent / "datagroups").exists(): 110 datagroups_path = self.views_path.parent / "datagroups" 111 datagroup_file = ( 112 datagroups_path 113 / f'{self.views["base_view"]}_last_updated.datagroup.lkml' 114 ) 115 if datagroup_file.exists(): 116 return f'{self.views["base_view"]}_last_updated' 117 return None
Return the name of the associated datagroup.
Return None
if there is no datagroup for this explore.
def
get_unnested_fields_joins_lookml(self) -> list:
119 def get_unnested_fields_joins_lookml( 120 self, 121 ) -> list: 122 """Get the LookML for joining unnested fields.""" 123 views_lookml = self.get_view_lookml(self.views["base_view"]) 124 views: List[str] = [view["name"] for view in views_lookml["views"]] 125 parent_base_name = views_lookml["views"][0]["name"] 126 127 extended_views: List[str] = [] 128 if "extended_view" in self.views: 129 # check for extended views 130 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 131 extended_views = [view["name"] for view in extended_views_lookml["views"]] 132 133 views_lookml.update(extended_views_lookml) 134 views += extended_views 135 136 joins = [] 137 for view in views_lookml["views"][1:]: 138 view_name = view["name"] 139 # get repeated, nested fields that exist as separate views in lookml 140 base_name, metric = self._get_base_name_and_metric( 141 view_name=view_name, views=views 142 ) 143 metric_name = view_name 144 metric_label = slug_to_title(metric_name) 145 146 if view_name in extended_views: 147 # names of extended views are overriden by the name of the view that is extending them 148 metric_label = slug_to_title( 149 metric_name.replace(base_name, parent_base_name) 150 ) 151 base_name = parent_base_name 152 153 joins.append( 154 { 155 "name": view_name, 156 "view_label": metric_label, 157 "relationship": "one_to_many", 158 "sql": ( 159 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 160 ), 161 } 162 ) 163 164 return joins
Get the LookML for joining unnested fields.
def
has_view_dimension(self, view: str, dimension_name: str) -> bool:
232 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 233 """Determine whether a this view has this dimension.""" 234 for _view_defn in self.get_view_lookml(view)["views"]: 235 if _view_defn["name"] != view: 236 continue 237 for dim in _view_defn.get("dimensions", []): 238 if dim["name"] == dimension_name: 239 return True 240 return False
Determine whether a this view has this dimension.
def
get_view_time_partitioning_group(self, view: str) -> Optional[str]:
242 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 243 """Get time partitiong dimension group for this view. 244 245 Return the name of the first dimension group tagged "time_partitioning_field", 246 and fall back to "submission" if available. 247 """ 248 has_submission = False 249 for _view_defn in self.get_view_lookml(view)["views"]: 250 if not _view_defn["name"] == view: 251 continue 252 for dim in _view_defn.get("dimension_groups", []): 253 if "time_partitioning_field" in dim.get("tags", []): 254 return dim["name"] 255 elif dim["name"] == "submission": 256 has_submission = True 257 if has_submission: 258 return "submission" 259 return None
Get time partitiong dimension group for this view.
Return the name of the first dimension group tagged "time_partitioning_field", and fall back to "submission" if available.
def
get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
261 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 262 """Get required filters for this view.""" 263 filters = [] 264 view = self.views[view_name] 265 266 # Add a default filter on channel, if it's present in the view 267 default_channel = self._get_default_channel(view) 268 if default_channel is not None: 269 filters.append({"channel": default_channel}) 270 271 # Add submission filter, if present in the view 272 if time_partitioning_group := self.get_view_time_partitioning_group(view): 273 filters.append({f"{time_partitioning_group}_date": "28 days"}) 274 275 return filters
Get required filters for this view.