generator.explores.explore
Generic explore type.
1"""Generic explore type.""" 2 3from __future__ import annotations 4 5from dataclasses import dataclass, field 6from pathlib import Path 7from typing import Any, Dict, List, Optional, Tuple 8 9import lkml 10 11from ..views.lookml_utils import escape_filter_expr, slug_to_title 12 13 14@dataclass 15class Explore: 16 """A generic explore.""" 17 18 name: str 19 views: Dict[str, str] 20 views_path: Optional[Path] = None 21 defn: Optional[Dict[str, str]] = None 22 type: str = field(init=False) 23 24 def to_dict(self) -> dict: 25 """Explore instance represented as a dict.""" 26 return {self.name: {"type": self.type, "views": self.views}} 27 28 def to_lookml( 29 self, v1_name: Optional[str], hidden: Optional[bool] 30 ) -> List[Dict[str, Any]]: 31 """ 32 Generate LookML for this explore. 33 34 Any generation done in dependent explore's 35 `_to_lookml` takes precedence over these fields. 36 """ 37 base_lookml = {} 38 if hidden: 39 base_lookml["hidden"] = "yes" 40 base_view_name = next( 41 ( 42 view_name 43 for view_type, view_name in self.views.items() 44 if view_type == "base_view" 45 ) 46 ) 47 for view_type, view in self.views.items(): 48 # We look at our dependent views to see if they have a 49 # "submission" field. Dependent views are any that are: 50 # - base_view 51 # - extended_view* 52 # 53 # We do not want to look at joined views. Those should be 54 # labeled as: 55 # - join* 56 # 57 # If they have a submission field, we filter on the date. 58 # This allows for filter queries to succeed. 59 if "join" in view_type: 60 continue 61 if time_partitioning_group := self.get_view_time_partitioning_group(view): 62 base_lookml["sql_always_where"] = ( 63 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 64 ) 65 66 # We only update the first returned explore 67 new_lookml = self._to_lookml(v1_name) 68 base_lookml.update(new_lookml[0]) 69 new_lookml[0] = base_lookml 70 71 return new_lookml 72 73 def _to_lookml( 74 self, 75 v1_name: Optional[str], 76 ) -> List[Dict[str, Any]]: 77 raise NotImplementedError("Only implemented in subclasses") 78 79 def get_dependent_views(self) -> List[str]: 80 """Get views this explore is dependent on.""" 81 dependent_views = [] 82 for _type, views in self.views.items(): 83 if _type.startswith("extended"): 84 continue 85 elif _type.startswith("joined"): 86 dependent_views += [view for view in views] 87 else: 88 dependent_views.append(views) 89 return dependent_views 90 91 @staticmethod 92 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 93 """Get an instance of an explore from a namespace definition.""" 94 raise NotImplementedError("Only implemented in subclasses") 95 96 def get_view_lookml(self, view: str) -> dict: 97 """Get the LookML for a view.""" 98 if self.views_path is not None: 99 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 100 raise Exception("Missing view path for get_view_lookml") 101 102 def get_unnested_fields_joins_lookml( 103 self, 104 ) -> list: 105 """Get the LookML for joining unnested fields.""" 106 views_lookml = self.get_view_lookml(self.views["base_view"]) 107 views: List[str] = [view["name"] for view in views_lookml["views"]] 108 parent_base_name = views_lookml["views"][0]["name"] 109 110 extended_views: List[str] = [] 111 if "extended_view" in self.views: 112 # check for extended views 113 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 114 extended_views = [view["name"] for view in extended_views_lookml["views"]] 115 116 views_lookml.update(extended_views_lookml) 117 views += extended_views 118 119 joins = [] 120 for view in views_lookml["views"][1:]: 121 view_name = view["name"] 122 # get repeated, nested fields that exist as separate views in lookml 123 base_name, metric = self._get_base_name_and_metric( 124 view_name=view_name, views=views 125 ) 126 metric_name = view_name 127 metric_label = slug_to_title(metric_name) 128 129 if view_name in extended_views: 130 # names of extended views are overriden by the name of the view that is extending them 131 metric_label = slug_to_title( 132 metric_name.replace(base_name, parent_base_name) 133 ) 134 base_name = parent_base_name 135 136 joins.append( 137 { 138 "name": view_name, 139 "view_label": metric_label, 140 "relationship": "one_to_many", 141 "sql": ( 142 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 143 ), 144 } 145 ) 146 147 return joins 148 149 def _get_default_channel(self, view: str) -> Optional[str]: 150 channel_params = [ 151 param 152 for _view_defn in self.get_view_lookml(view)["views"] 153 for param in _view_defn.get("filters", []) 154 if _view_defn["name"] == view and param["name"] == "channel" 155 ] 156 157 if channel_params: 158 allowed_values = channel_params[0]["suggestions"] 159 default_value = allowed_values[0] 160 return escape_filter_expr(default_value) 161 return None 162 163 def _get_base_name_and_metric( 164 self, view_name: str, views: List[str] 165 ) -> Tuple[str, str]: 166 """ 167 Get base view and metric names. 168 169 Returns the the name of the base view and the metric based on the 170 passed `view_name` and existing views. 171 172 The names are resolved in a backwards fashion to account for 173 repeated nested fields that might contain other nested fields. 174 For example: 175 176 view: sync { 177 [...] 178 dimension: payload__events { 179 sql: ${TABLE}.payload.events ;; 180 } 181 } 182 183 view: sync__payload__events { 184 [...] 185 dimension: f5_ { 186 sql: ${TABLE}.f5_ ;; 187 } 188 } 189 190 view: sync__payload__events__f5_ { 191 [...] 192 } 193 194 For these nested views to get translated to the following joins, the names 195 need to be resolved backwards: 196 197 join: sync__payload__events { 198 relationship: one_to_many 199 sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;; 200 } 201 202 join: sync__payload__events__f5_ { 203 relationship: one_to_many 204 sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;; 205 } 206 """ 207 split = view_name.split("__") 208 for index in range(len(split) - 1, 0, -1): 209 base_view = "__".join(split[:index]) 210 metric = "__".join(split[index:]) 211 if base_view in views: 212 return (base_view, metric) 213 raise Exception(f"Cannot get base name and metric from view {view_name}") 214 215 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 216 """Determine whether a this view has this dimension.""" 217 for _view_defn in self.get_view_lookml(view)["views"]: 218 if _view_defn["name"] != view: 219 continue 220 for dim in _view_defn.get("dimensions", []): 221 if dim["name"] == dimension_name: 222 return True 223 return False 224 225 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 226 """Get time partitiong dimension group for this view. 227 228 Return the name of the first dimension group tagged "time_partitioning_field", 229 and fall back to "submission" if available. 230 """ 231 has_submission = False 232 for _view_defn in self.get_view_lookml(view)["views"]: 233 if not _view_defn["name"] == view: 234 continue 235 for dim in _view_defn.get("dimension_groups", []): 236 if "time_partitioning_field" in dim.get("tags", []): 237 return dim["name"] 238 elif dim["name"] == "submission": 239 has_submission = True 240 if has_submission: 241 return "submission" 242 return None 243 244 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 245 """Get required filters for this view.""" 246 filters = [] 247 view = self.views[view_name] 248 249 # Add a default filter on channel, if it's present in the view 250 default_channel = self._get_default_channel(view) 251 if default_channel is not None: 252 filters.append({"channel": default_channel}) 253 254 # Add submission filter, if present in the view 255 if time_partitioning_group := self.get_view_time_partitioning_group(view): 256 filters.append({f"{time_partitioning_group}_date": "28 days"}) 257 258 return filters 259 260 def __eq__(self, other) -> bool: 261 """Check for equality with other View.""" 262 263 def comparable_dict(d): 264 return tuple(sorted(d.items())) 265 266 if isinstance(other, Explore): 267 return ( 268 self.name == other.name 269 and comparable_dict(self.views) == comparable_dict(other.views) 270 and self.type == other.type 271 ) 272 return False
@dataclass
class
Explore:
15@dataclass 16class Explore: 17 """A generic explore.""" 18 19 name: str 20 views: Dict[str, str] 21 views_path: Optional[Path] = None 22 defn: Optional[Dict[str, str]] = None 23 type: str = field(init=False) 24 25 def to_dict(self) -> dict: 26 """Explore instance represented as a dict.""" 27 return {self.name: {"type": self.type, "views": self.views}} 28 29 def to_lookml( 30 self, v1_name: Optional[str], hidden: Optional[bool] 31 ) -> List[Dict[str, Any]]: 32 """ 33 Generate LookML for this explore. 34 35 Any generation done in dependent explore's 36 `_to_lookml` takes precedence over these fields. 37 """ 38 base_lookml = {} 39 if hidden: 40 base_lookml["hidden"] = "yes" 41 base_view_name = next( 42 ( 43 view_name 44 for view_type, view_name in self.views.items() 45 if view_type == "base_view" 46 ) 47 ) 48 for view_type, view in self.views.items(): 49 # We look at our dependent views to see if they have a 50 # "submission" field. Dependent views are any that are: 51 # - base_view 52 # - extended_view* 53 # 54 # We do not want to look at joined views. Those should be 55 # labeled as: 56 # - join* 57 # 58 # If they have a submission field, we filter on the date. 59 # This allows for filter queries to succeed. 60 if "join" in view_type: 61 continue 62 if time_partitioning_group := self.get_view_time_partitioning_group(view): 63 base_lookml["sql_always_where"] = ( 64 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 65 ) 66 67 # We only update the first returned explore 68 new_lookml = self._to_lookml(v1_name) 69 base_lookml.update(new_lookml[0]) 70 new_lookml[0] = base_lookml 71 72 return new_lookml 73 74 def _to_lookml( 75 self, 76 v1_name: Optional[str], 77 ) -> List[Dict[str, Any]]: 78 raise NotImplementedError("Only implemented in subclasses") 79 80 def get_dependent_views(self) -> List[str]: 81 """Get views this explore is dependent on.""" 82 dependent_views = [] 83 for _type, views in self.views.items(): 84 if _type.startswith("extended"): 85 continue 86 elif _type.startswith("joined"): 87 dependent_views += [view for view in views] 88 else: 89 dependent_views.append(views) 90 return dependent_views 91 92 @staticmethod 93 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 94 """Get an instance of an explore from a namespace definition.""" 95 raise NotImplementedError("Only implemented in subclasses") 96 97 def get_view_lookml(self, view: str) -> dict: 98 """Get the LookML for a view.""" 99 if self.views_path is not None: 100 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 101 raise Exception("Missing view path for get_view_lookml") 102 103 def get_unnested_fields_joins_lookml( 104 self, 105 ) -> list: 106 """Get the LookML for joining unnested fields.""" 107 views_lookml = self.get_view_lookml(self.views["base_view"]) 108 views: List[str] = [view["name"] for view in views_lookml["views"]] 109 parent_base_name = views_lookml["views"][0]["name"] 110 111 extended_views: List[str] = [] 112 if "extended_view" in self.views: 113 # check for extended views 114 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 115 extended_views = [view["name"] for view in extended_views_lookml["views"]] 116 117 views_lookml.update(extended_views_lookml) 118 views += extended_views 119 120 joins = [] 121 for view in views_lookml["views"][1:]: 122 view_name = view["name"] 123 # get repeated, nested fields that exist as separate views in lookml 124 base_name, metric = self._get_base_name_and_metric( 125 view_name=view_name, views=views 126 ) 127 metric_name = view_name 128 metric_label = slug_to_title(metric_name) 129 130 if view_name in extended_views: 131 # names of extended views are overriden by the name of the view that is extending them 132 metric_label = slug_to_title( 133 metric_name.replace(base_name, parent_base_name) 134 ) 135 base_name = parent_base_name 136 137 joins.append( 138 { 139 "name": view_name, 140 "view_label": metric_label, 141 "relationship": "one_to_many", 142 "sql": ( 143 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 144 ), 145 } 146 ) 147 148 return joins 149 150 def _get_default_channel(self, view: str) -> Optional[str]: 151 channel_params = [ 152 param 153 for _view_defn in self.get_view_lookml(view)["views"] 154 for param in _view_defn.get("filters", []) 155 if _view_defn["name"] == view and param["name"] == "channel" 156 ] 157 158 if channel_params: 159 allowed_values = channel_params[0]["suggestions"] 160 default_value = allowed_values[0] 161 return escape_filter_expr(default_value) 162 return None 163 164 def _get_base_name_and_metric( 165 self, view_name: str, views: List[str] 166 ) -> Tuple[str, str]: 167 """ 168 Get base view and metric names. 169 170 Returns the the name of the base view and the metric based on the 171 passed `view_name` and existing views. 172 173 The names are resolved in a backwards fashion to account for 174 repeated nested fields that might contain other nested fields. 175 For example: 176 177 view: sync { 178 [...] 179 dimension: payload__events { 180 sql: ${TABLE}.payload.events ;; 181 } 182 } 183 184 view: sync__payload__events { 185 [...] 186 dimension: f5_ { 187 sql: ${TABLE}.f5_ ;; 188 } 189 } 190 191 view: sync__payload__events__f5_ { 192 [...] 193 } 194 195 For these nested views to get translated to the following joins, the names 196 need to be resolved backwards: 197 198 join: sync__payload__events { 199 relationship: one_to_many 200 sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;; 201 } 202 203 join: sync__payload__events__f5_ { 204 relationship: one_to_many 205 sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;; 206 } 207 """ 208 split = view_name.split("__") 209 for index in range(len(split) - 1, 0, -1): 210 base_view = "__".join(split[:index]) 211 metric = "__".join(split[index:]) 212 if base_view in views: 213 return (base_view, metric) 214 raise Exception(f"Cannot get base name and metric from view {view_name}") 215 216 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 217 """Determine whether a this view has this dimension.""" 218 for _view_defn in self.get_view_lookml(view)["views"]: 219 if _view_defn["name"] != view: 220 continue 221 for dim in _view_defn.get("dimensions", []): 222 if dim["name"] == dimension_name: 223 return True 224 return False 225 226 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 227 """Get time partitiong dimension group for this view. 228 229 Return the name of the first dimension group tagged "time_partitioning_field", 230 and fall back to "submission" if available. 231 """ 232 has_submission = False 233 for _view_defn in self.get_view_lookml(view)["views"]: 234 if not _view_defn["name"] == view: 235 continue 236 for dim in _view_defn.get("dimension_groups", []): 237 if "time_partitioning_field" in dim.get("tags", []): 238 return dim["name"] 239 elif dim["name"] == "submission": 240 has_submission = True 241 if has_submission: 242 return "submission" 243 return None 244 245 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 246 """Get required filters for this view.""" 247 filters = [] 248 view = self.views[view_name] 249 250 # Add a default filter on channel, if it's present in the view 251 default_channel = self._get_default_channel(view) 252 if default_channel is not None: 253 filters.append({"channel": default_channel}) 254 255 # Add submission filter, if present in the view 256 if time_partitioning_group := self.get_view_time_partitioning_group(view): 257 filters.append({f"{time_partitioning_group}_date": "28 days"}) 258 259 return filters 260 261 def __eq__(self, other) -> bool: 262 """Check for equality with other View.""" 263 264 def comparable_dict(d): 265 return tuple(sorted(d.items())) 266 267 if isinstance(other, Explore): 268 return ( 269 self.name == other.name 270 and comparable_dict(self.views) == comparable_dict(other.views) 271 and self.type == other.type 272 ) 273 return False
A generic explore.
Explore( name: str, views: Dict[str, str], views_path: Optional[pathlib.Path] = None, defn: Optional[Dict[str, str]] = None)
def
to_dict(self) -> dict:
25 def to_dict(self) -> dict: 26 """Explore instance represented as a dict.""" 27 return {self.name: {"type": self.type, "views": self.views}}
Explore instance represented as a dict.
def
to_lookml( self, v1_name: Optional[str], hidden: Optional[bool]) -> List[Dict[str, Any]]:
29 def to_lookml( 30 self, v1_name: Optional[str], hidden: Optional[bool] 31 ) -> List[Dict[str, Any]]: 32 """ 33 Generate LookML for this explore. 34 35 Any generation done in dependent explore's 36 `_to_lookml` takes precedence over these fields. 37 """ 38 base_lookml = {} 39 if hidden: 40 base_lookml["hidden"] = "yes" 41 base_view_name = next( 42 ( 43 view_name 44 for view_type, view_name in self.views.items() 45 if view_type == "base_view" 46 ) 47 ) 48 for view_type, view in self.views.items(): 49 # We look at our dependent views to see if they have a 50 # "submission" field. Dependent views are any that are: 51 # - base_view 52 # - extended_view* 53 # 54 # We do not want to look at joined views. Those should be 55 # labeled as: 56 # - join* 57 # 58 # If they have a submission field, we filter on the date. 59 # This allows for filter queries to succeed. 60 if "join" in view_type: 61 continue 62 if time_partitioning_group := self.get_view_time_partitioning_group(view): 63 base_lookml["sql_always_where"] = ( 64 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 65 ) 66 67 # We only update the first returned explore 68 new_lookml = self._to_lookml(v1_name) 69 base_lookml.update(new_lookml[0]) 70 new_lookml[0] = base_lookml 71 72 return new_lookml
Generate LookML for this explore.
Any generation done in dependent explore's
_to_lookml
takes precedence over these fields.
def
get_dependent_views(self) -> List[str]:
80 def get_dependent_views(self) -> List[str]: 81 """Get views this explore is dependent on.""" 82 dependent_views = [] 83 for _type, views in self.views.items(): 84 if _type.startswith("extended"): 85 continue 86 elif _type.startswith("joined"): 87 dependent_views += [view for view in views] 88 else: 89 dependent_views.append(views) 90 return dependent_views
Get views this explore is dependent on.
92 @staticmethod 93 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 94 """Get an instance of an explore from a namespace definition.""" 95 raise NotImplementedError("Only implemented in subclasses")
Get an instance of an explore from a namespace definition.
def
get_view_lookml(self, view: str) -> dict:
97 def get_view_lookml(self, view: str) -> dict: 98 """Get the LookML for a view.""" 99 if self.views_path is not None: 100 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 101 raise Exception("Missing view path for get_view_lookml")
Get the LookML for a view.
def
get_unnested_fields_joins_lookml(self) -> list:
103 def get_unnested_fields_joins_lookml( 104 self, 105 ) -> list: 106 """Get the LookML for joining unnested fields.""" 107 views_lookml = self.get_view_lookml(self.views["base_view"]) 108 views: List[str] = [view["name"] for view in views_lookml["views"]] 109 parent_base_name = views_lookml["views"][0]["name"] 110 111 extended_views: List[str] = [] 112 if "extended_view" in self.views: 113 # check for extended views 114 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 115 extended_views = [view["name"] for view in extended_views_lookml["views"]] 116 117 views_lookml.update(extended_views_lookml) 118 views += extended_views 119 120 joins = [] 121 for view in views_lookml["views"][1:]: 122 view_name = view["name"] 123 # get repeated, nested fields that exist as separate views in lookml 124 base_name, metric = self._get_base_name_and_metric( 125 view_name=view_name, views=views 126 ) 127 metric_name = view_name 128 metric_label = slug_to_title(metric_name) 129 130 if view_name in extended_views: 131 # names of extended views are overriden by the name of the view that is extending them 132 metric_label = slug_to_title( 133 metric_name.replace(base_name, parent_base_name) 134 ) 135 base_name = parent_base_name 136 137 joins.append( 138 { 139 "name": view_name, 140 "view_label": metric_label, 141 "relationship": "one_to_many", 142 "sql": ( 143 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 144 ), 145 } 146 ) 147 148 return joins
Get the LookML for joining unnested fields.
def
has_view_dimension(self, view: str, dimension_name: str) -> bool:
216 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 217 """Determine whether a this view has this dimension.""" 218 for _view_defn in self.get_view_lookml(view)["views"]: 219 if _view_defn["name"] != view: 220 continue 221 for dim in _view_defn.get("dimensions", []): 222 if dim["name"] == dimension_name: 223 return True 224 return False
Determine whether a this view has this dimension.
def
get_view_time_partitioning_group(self, view: str) -> Optional[str]:
226 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 227 """Get time partitiong dimension group for this view. 228 229 Return the name of the first dimension group tagged "time_partitioning_field", 230 and fall back to "submission" if available. 231 """ 232 has_submission = False 233 for _view_defn in self.get_view_lookml(view)["views"]: 234 if not _view_defn["name"] == view: 235 continue 236 for dim in _view_defn.get("dimension_groups", []): 237 if "time_partitioning_field" in dim.get("tags", []): 238 return dim["name"] 239 elif dim["name"] == "submission": 240 has_submission = True 241 if has_submission: 242 return "submission" 243 return None
Get time partitiong dimension group for this view.
Return the name of the first dimension group tagged "time_partitioning_field", and fall back to "submission" if available.
def
get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
245 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 246 """Get required filters for this view.""" 247 filters = [] 248 view = self.views[view_name] 249 250 # Add a default filter on channel, if it's present in the view 251 default_channel = self._get_default_channel(view) 252 if default_channel is not None: 253 filters.append({"channel": default_channel}) 254 255 # Add submission filter, if present in the view 256 if time_partitioning_group := self.get_view_time_partitioning_group(view): 257 filters.append({f"{time_partitioning_group}_date": "28 days"}) 258 259 return filters
Get required filters for this view.