generator.explores.explore
Generic explore type.
1"""Generic explore type.""" 2 3from __future__ import annotations 4 5from dataclasses import dataclass, field 6from pathlib import Path 7from typing import Any, Dict, List, Optional, Tuple 8 9import lkml 10 11from ..views.lookml_utils import escape_filter_expr, slug_to_title 12 13 14@dataclass 15class Explore: 16 """A generic explore.""" 17 18 name: str 19 views: Dict[str, str] 20 views_path: Optional[Path] = None 21 defn: Optional[Dict[str, str]] = None 22 type: str = field(init=False) 23 24 def to_dict(self) -> dict: 25 """Explore instance represented as a dict.""" 26 return {self.name: {"type": self.type, "views": self.views}} 27 28 def to_lookml( 29 self, v1_name: Optional[str], hidden: Optional[bool] 30 ) -> List[Dict[str, Any]]: 31 """ 32 Generate LookML for this explore. 33 34 Any generation done in dependent explore's 35 `_to_lookml` takes precedence over these fields. 36 """ 37 base_lookml = {} 38 if hidden: 39 base_lookml["hidden"] = "yes" 40 base_view_name = next( 41 ( 42 view_name 43 for view_type, view_name in self.views.items() 44 if view_type == "base_view" 45 ) 46 ) 47 for view_type, view in self.views.items(): 48 # We look at our dependent views to see if they have a 49 # "submission" field. Dependent views are any that are: 50 # - base_view 51 # - extended_view* 52 # 53 # We do not want to look at joined views. Those should be 54 # labeled as: 55 # - join* 56 # 57 # If they have a submission field, we filter on the date. 58 # This allows for filter queries to succeed. 59 if "join" in view_type: 60 continue 61 if time_partitioning_group := self.get_view_time_partitioning_group(view): 62 base_lookml["sql_always_where"] = ( 63 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 64 ) 65 66 # We only update the first returned explore 67 new_lookml = self._to_lookml(v1_name) 68 base_lookml.update(new_lookml[0]) 69 new_lookml[0] = base_lookml 70 71 return new_lookml 72 73 def _to_lookml( 74 self, 75 v1_name: Optional[str], 76 ) -> List[Dict[str, Any]]: 77 raise NotImplementedError("Only implemented in subclasses") 78 79 def get_dependent_views(self) -> List[str]: 80 """Get views this explore is dependent on.""" 81 dependent_views = [] 82 for _type, views in self.views.items(): 83 if _type.startswith("extended"): 84 continue 85 elif _type.startswith("joined"): 86 dependent_views += [view for view in views] 87 else: 88 dependent_views.append(views) 89 return dependent_views 90 91 @staticmethod 92 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 93 """Get an instance of an explore from a namespace definition.""" 94 raise NotImplementedError("Only implemented in subclasses") 95 96 def get_view_lookml(self, view: str) -> dict: 97 """Get the LookML for a view.""" 98 if self.views_path is not None: 99 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 100 raise Exception("Missing view path for get_view_lookml") 101 102 def get_datagroup(self) -> Optional[str]: 103 """ 104 Return the name of the associated datagroup. 105 106 Return `None` if there is no datagroup for this explore. 107 """ 108 return None 109 110 def get_unnested_fields_joins_lookml( 111 self, 112 ) -> list: 113 """Get the LookML for joining unnested fields.""" 114 views_lookml = self.get_view_lookml(self.views["base_view"]) 115 views: List[str] = [view["name"] for view in views_lookml["views"]] 116 parent_base_name = views_lookml["views"][0]["name"] 117 118 extended_views: List[str] = [] 119 if "extended_view" in self.views: 120 # check for extended views 121 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 122 extended_views = [view["name"] for view in extended_views_lookml["views"]] 123 124 views_lookml.update(extended_views_lookml) 125 views += extended_views 126 127 joins = [] 128 for view in views_lookml["views"][1:]: 129 view_name = view["name"] 130 # get repeated, nested fields that exist as separate views in lookml 131 base_name, metric = self._get_base_name_and_metric( 132 view_name=view_name, views=views 133 ) 134 metric_name = view_name 135 metric_label = slug_to_title(metric_name) 136 137 if view_name in extended_views: 138 # names of extended views are overriden by the name of the view that is extending them 139 metric_label = slug_to_title( 140 metric_name.replace(base_name, parent_base_name) 141 ) 142 base_name = parent_base_name 143 144 joins.append( 145 { 146 "name": view_name, 147 "view_label": metric_label, 148 "relationship": "one_to_many", 149 "sql": ( 150 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 151 ), 152 } 153 ) 154 155 return joins 156 157 def _get_default_channel(self, view: str) -> Optional[str]: 158 channel_params = [ 159 param 160 for _view_defn in self.get_view_lookml(view)["views"] 161 for param in _view_defn.get("filters", []) 162 if _view_defn["name"] == view and param["name"] == "channel" 163 ] 164 165 if channel_params: 166 allowed_values = channel_params[0]["suggestions"] 167 default_value = allowed_values[0] 168 return escape_filter_expr(default_value) 169 return None 170 171 def _get_base_name_and_metric( 172 self, view_name: str, views: List[str] 173 ) -> Tuple[str, str]: 174 """ 175 Get base view and metric names. 176 177 Returns the the name of the base view and the metric based on the 178 passed `view_name` and existing views. 179 180 The names are resolved in a backwards fashion to account for 181 repeated nested fields that might contain other nested fields. 182 For example: 183 184 view: sync { 185 [...] 186 dimension: payload__events { 187 sql: ${TABLE}.payload.events ;; 188 } 189 } 190 191 view: sync__payload__events { 192 [...] 193 dimension: f5_ { 194 sql: ${TABLE}.f5_ ;; 195 } 196 } 197 198 view: sync__payload__events__f5_ { 199 [...] 200 } 201 202 For these nested views to get translated to the following joins, the names 203 need to be resolved backwards: 204 205 join: sync__payload__events { 206 relationship: one_to_many 207 sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;; 208 } 209 210 join: sync__payload__events__f5_ { 211 relationship: one_to_many 212 sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;; 213 } 214 """ 215 split = view_name.split("__") 216 for index in range(len(split) - 1, 0, -1): 217 base_view = "__".join(split[:index]) 218 metric = "__".join(split[index:]) 219 if base_view in views: 220 return (base_view, metric) 221 raise Exception(f"Cannot get base name and metric from view {view_name}") 222 223 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 224 """Determine whether a this view has this dimension.""" 225 for _view_defn in self.get_view_lookml(view)["views"]: 226 if _view_defn["name"] != view: 227 continue 228 for dim in _view_defn.get("dimensions", []): 229 if dim["name"] == dimension_name: 230 return True 231 return False 232 233 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 234 """Get time partitiong dimension group for this view. 235 236 Return the name of the first dimension group tagged "time_partitioning_field", 237 and fall back to "submission" if available. 238 """ 239 has_submission = False 240 for _view_defn in self.get_view_lookml(view)["views"]: 241 if not _view_defn["name"] == view: 242 continue 243 for dim in _view_defn.get("dimension_groups", []): 244 if "time_partitioning_field" in dim.get("tags", []): 245 return dim["name"] 246 elif dim["name"] == "submission": 247 has_submission = True 248 if has_submission: 249 return "submission" 250 return None 251 252 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 253 """Get required filters for this view.""" 254 filters = [] 255 view = self.views[view_name] 256 257 # Add a default filter on channel, if it's present in the view 258 default_channel = self._get_default_channel(view) 259 if default_channel is not None: 260 filters.append({"channel": default_channel}) 261 262 # Add submission filter, if present in the view 263 if time_partitioning_group := self.get_view_time_partitioning_group(view): 264 filters.append({f"{time_partitioning_group}_date": "28 days"}) 265 266 return filters 267 268 def __eq__(self, other) -> bool: 269 """Check for equality with other View.""" 270 271 def comparable_dict(d): 272 return tuple(sorted(d.items())) 273 274 if isinstance(other, Explore): 275 return ( 276 self.name == other.name 277 and comparable_dict(self.views) == comparable_dict(other.views) 278 and self.type == other.type 279 ) 280 return False
@dataclass
class
Explore:
15@dataclass 16class Explore: 17 """A generic explore.""" 18 19 name: str 20 views: Dict[str, str] 21 views_path: Optional[Path] = None 22 defn: Optional[Dict[str, str]] = None 23 type: str = field(init=False) 24 25 def to_dict(self) -> dict: 26 """Explore instance represented as a dict.""" 27 return {self.name: {"type": self.type, "views": self.views}} 28 29 def to_lookml( 30 self, v1_name: Optional[str], hidden: Optional[bool] 31 ) -> List[Dict[str, Any]]: 32 """ 33 Generate LookML for this explore. 34 35 Any generation done in dependent explore's 36 `_to_lookml` takes precedence over these fields. 37 """ 38 base_lookml = {} 39 if hidden: 40 base_lookml["hidden"] = "yes" 41 base_view_name = next( 42 ( 43 view_name 44 for view_type, view_name in self.views.items() 45 if view_type == "base_view" 46 ) 47 ) 48 for view_type, view in self.views.items(): 49 # We look at our dependent views to see if they have a 50 # "submission" field. Dependent views are any that are: 51 # - base_view 52 # - extended_view* 53 # 54 # We do not want to look at joined views. Those should be 55 # labeled as: 56 # - join* 57 # 58 # If they have a submission field, we filter on the date. 59 # This allows for filter queries to succeed. 60 if "join" in view_type: 61 continue 62 if time_partitioning_group := self.get_view_time_partitioning_group(view): 63 base_lookml["sql_always_where"] = ( 64 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 65 ) 66 67 # We only update the first returned explore 68 new_lookml = self._to_lookml(v1_name) 69 base_lookml.update(new_lookml[0]) 70 new_lookml[0] = base_lookml 71 72 return new_lookml 73 74 def _to_lookml( 75 self, 76 v1_name: Optional[str], 77 ) -> List[Dict[str, Any]]: 78 raise NotImplementedError("Only implemented in subclasses") 79 80 def get_dependent_views(self) -> List[str]: 81 """Get views this explore is dependent on.""" 82 dependent_views = [] 83 for _type, views in self.views.items(): 84 if _type.startswith("extended"): 85 continue 86 elif _type.startswith("joined"): 87 dependent_views += [view for view in views] 88 else: 89 dependent_views.append(views) 90 return dependent_views 91 92 @staticmethod 93 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 94 """Get an instance of an explore from a namespace definition.""" 95 raise NotImplementedError("Only implemented in subclasses") 96 97 def get_view_lookml(self, view: str) -> dict: 98 """Get the LookML for a view.""" 99 if self.views_path is not None: 100 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 101 raise Exception("Missing view path for get_view_lookml") 102 103 def get_datagroup(self) -> Optional[str]: 104 """ 105 Return the name of the associated datagroup. 106 107 Return `None` if there is no datagroup for this explore. 108 """ 109 return None 110 111 def get_unnested_fields_joins_lookml( 112 self, 113 ) -> list: 114 """Get the LookML for joining unnested fields.""" 115 views_lookml = self.get_view_lookml(self.views["base_view"]) 116 views: List[str] = [view["name"] for view in views_lookml["views"]] 117 parent_base_name = views_lookml["views"][0]["name"] 118 119 extended_views: List[str] = [] 120 if "extended_view" in self.views: 121 # check for extended views 122 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 123 extended_views = [view["name"] for view in extended_views_lookml["views"]] 124 125 views_lookml.update(extended_views_lookml) 126 views += extended_views 127 128 joins = [] 129 for view in views_lookml["views"][1:]: 130 view_name = view["name"] 131 # get repeated, nested fields that exist as separate views in lookml 132 base_name, metric = self._get_base_name_and_metric( 133 view_name=view_name, views=views 134 ) 135 metric_name = view_name 136 metric_label = slug_to_title(metric_name) 137 138 if view_name in extended_views: 139 # names of extended views are overriden by the name of the view that is extending them 140 metric_label = slug_to_title( 141 metric_name.replace(base_name, parent_base_name) 142 ) 143 base_name = parent_base_name 144 145 joins.append( 146 { 147 "name": view_name, 148 "view_label": metric_label, 149 "relationship": "one_to_many", 150 "sql": ( 151 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 152 ), 153 } 154 ) 155 156 return joins 157 158 def _get_default_channel(self, view: str) -> Optional[str]: 159 channel_params = [ 160 param 161 for _view_defn in self.get_view_lookml(view)["views"] 162 for param in _view_defn.get("filters", []) 163 if _view_defn["name"] == view and param["name"] == "channel" 164 ] 165 166 if channel_params: 167 allowed_values = channel_params[0]["suggestions"] 168 default_value = allowed_values[0] 169 return escape_filter_expr(default_value) 170 return None 171 172 def _get_base_name_and_metric( 173 self, view_name: str, views: List[str] 174 ) -> Tuple[str, str]: 175 """ 176 Get base view and metric names. 177 178 Returns the the name of the base view and the metric based on the 179 passed `view_name` and existing views. 180 181 The names are resolved in a backwards fashion to account for 182 repeated nested fields that might contain other nested fields. 183 For example: 184 185 view: sync { 186 [...] 187 dimension: payload__events { 188 sql: ${TABLE}.payload.events ;; 189 } 190 } 191 192 view: sync__payload__events { 193 [...] 194 dimension: f5_ { 195 sql: ${TABLE}.f5_ ;; 196 } 197 } 198 199 view: sync__payload__events__f5_ { 200 [...] 201 } 202 203 For these nested views to get translated to the following joins, the names 204 need to be resolved backwards: 205 206 join: sync__payload__events { 207 relationship: one_to_many 208 sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;; 209 } 210 211 join: sync__payload__events__f5_ { 212 relationship: one_to_many 213 sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;; 214 } 215 """ 216 split = view_name.split("__") 217 for index in range(len(split) - 1, 0, -1): 218 base_view = "__".join(split[:index]) 219 metric = "__".join(split[index:]) 220 if base_view in views: 221 return (base_view, metric) 222 raise Exception(f"Cannot get base name and metric from view {view_name}") 223 224 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 225 """Determine whether a this view has this dimension.""" 226 for _view_defn in self.get_view_lookml(view)["views"]: 227 if _view_defn["name"] != view: 228 continue 229 for dim in _view_defn.get("dimensions", []): 230 if dim["name"] == dimension_name: 231 return True 232 return False 233 234 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 235 """Get time partitiong dimension group for this view. 236 237 Return the name of the first dimension group tagged "time_partitioning_field", 238 and fall back to "submission" if available. 239 """ 240 has_submission = False 241 for _view_defn in self.get_view_lookml(view)["views"]: 242 if not _view_defn["name"] == view: 243 continue 244 for dim in _view_defn.get("dimension_groups", []): 245 if "time_partitioning_field" in dim.get("tags", []): 246 return dim["name"] 247 elif dim["name"] == "submission": 248 has_submission = True 249 if has_submission: 250 return "submission" 251 return None 252 253 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 254 """Get required filters for this view.""" 255 filters = [] 256 view = self.views[view_name] 257 258 # Add a default filter on channel, if it's present in the view 259 default_channel = self._get_default_channel(view) 260 if default_channel is not None: 261 filters.append({"channel": default_channel}) 262 263 # Add submission filter, if present in the view 264 if time_partitioning_group := self.get_view_time_partitioning_group(view): 265 filters.append({f"{time_partitioning_group}_date": "28 days"}) 266 267 return filters 268 269 def __eq__(self, other) -> bool: 270 """Check for equality with other View.""" 271 272 def comparable_dict(d): 273 return tuple(sorted(d.items())) 274 275 if isinstance(other, Explore): 276 return ( 277 self.name == other.name 278 and comparable_dict(self.views) == comparable_dict(other.views) 279 and self.type == other.type 280 ) 281 return False
A generic explore.
Explore( name: str, views: Dict[str, str], views_path: Optional[pathlib.Path] = None, defn: Optional[Dict[str, str]] = None)
def
to_dict(self) -> dict:
25 def to_dict(self) -> dict: 26 """Explore instance represented as a dict.""" 27 return {self.name: {"type": self.type, "views": self.views}}
Explore instance represented as a dict.
def
to_lookml( self, v1_name: Optional[str], hidden: Optional[bool]) -> List[Dict[str, Any]]:
29 def to_lookml( 30 self, v1_name: Optional[str], hidden: Optional[bool] 31 ) -> List[Dict[str, Any]]: 32 """ 33 Generate LookML for this explore. 34 35 Any generation done in dependent explore's 36 `_to_lookml` takes precedence over these fields. 37 """ 38 base_lookml = {} 39 if hidden: 40 base_lookml["hidden"] = "yes" 41 base_view_name = next( 42 ( 43 view_name 44 for view_type, view_name in self.views.items() 45 if view_type == "base_view" 46 ) 47 ) 48 for view_type, view in self.views.items(): 49 # We look at our dependent views to see if they have a 50 # "submission" field. Dependent views are any that are: 51 # - base_view 52 # - extended_view* 53 # 54 # We do not want to look at joined views. Those should be 55 # labeled as: 56 # - join* 57 # 58 # If they have a submission field, we filter on the date. 59 # This allows for filter queries to succeed. 60 if "join" in view_type: 61 continue 62 if time_partitioning_group := self.get_view_time_partitioning_group(view): 63 base_lookml["sql_always_where"] = ( 64 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 65 ) 66 67 # We only update the first returned explore 68 new_lookml = self._to_lookml(v1_name) 69 base_lookml.update(new_lookml[0]) 70 new_lookml[0] = base_lookml 71 72 return new_lookml
Generate LookML for this explore.
Any generation done in dependent explore's
_to_lookml
takes precedence over these fields.
def
get_dependent_views(self) -> List[str]:
80 def get_dependent_views(self) -> List[str]: 81 """Get views this explore is dependent on.""" 82 dependent_views = [] 83 for _type, views in self.views.items(): 84 if _type.startswith("extended"): 85 continue 86 elif _type.startswith("joined"): 87 dependent_views += [view for view in views] 88 else: 89 dependent_views.append(views) 90 return dependent_views
Get views this explore is dependent on.
92 @staticmethod 93 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 94 """Get an instance of an explore from a namespace definition.""" 95 raise NotImplementedError("Only implemented in subclasses")
Get an instance of an explore from a namespace definition.
def
get_view_lookml(self, view: str) -> dict:
97 def get_view_lookml(self, view: str) -> dict: 98 """Get the LookML for a view.""" 99 if self.views_path is not None: 100 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 101 raise Exception("Missing view path for get_view_lookml")
Get the LookML for a view.
def
get_datagroup(self) -> Optional[str]:
103 def get_datagroup(self) -> Optional[str]: 104 """ 105 Return the name of the associated datagroup. 106 107 Return `None` if there is no datagroup for this explore. 108 """ 109 return None
Return the name of the associated datagroup.
Return None
if there is no datagroup for this explore.
def
get_unnested_fields_joins_lookml(self) -> list:
111 def get_unnested_fields_joins_lookml( 112 self, 113 ) -> list: 114 """Get the LookML for joining unnested fields.""" 115 views_lookml = self.get_view_lookml(self.views["base_view"]) 116 views: List[str] = [view["name"] for view in views_lookml["views"]] 117 parent_base_name = views_lookml["views"][0]["name"] 118 119 extended_views: List[str] = [] 120 if "extended_view" in self.views: 121 # check for extended views 122 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 123 extended_views = [view["name"] for view in extended_views_lookml["views"]] 124 125 views_lookml.update(extended_views_lookml) 126 views += extended_views 127 128 joins = [] 129 for view in views_lookml["views"][1:]: 130 view_name = view["name"] 131 # get repeated, nested fields that exist as separate views in lookml 132 base_name, metric = self._get_base_name_and_metric( 133 view_name=view_name, views=views 134 ) 135 metric_name = view_name 136 metric_label = slug_to_title(metric_name) 137 138 if view_name in extended_views: 139 # names of extended views are overriden by the name of the view that is extending them 140 metric_label = slug_to_title( 141 metric_name.replace(base_name, parent_base_name) 142 ) 143 base_name = parent_base_name 144 145 joins.append( 146 { 147 "name": view_name, 148 "view_label": metric_label, 149 "relationship": "one_to_many", 150 "sql": ( 151 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 152 ), 153 } 154 ) 155 156 return joins
Get the LookML for joining unnested fields.
def
has_view_dimension(self, view: str, dimension_name: str) -> bool:
224 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 225 """Determine whether a this view has this dimension.""" 226 for _view_defn in self.get_view_lookml(view)["views"]: 227 if _view_defn["name"] != view: 228 continue 229 for dim in _view_defn.get("dimensions", []): 230 if dim["name"] == dimension_name: 231 return True 232 return False
Determine whether a this view has this dimension.
def
get_view_time_partitioning_group(self, view: str) -> Optional[str]:
234 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 235 """Get time partitiong dimension group for this view. 236 237 Return the name of the first dimension group tagged "time_partitioning_field", 238 and fall back to "submission" if available. 239 """ 240 has_submission = False 241 for _view_defn in self.get_view_lookml(view)["views"]: 242 if not _view_defn["name"] == view: 243 continue 244 for dim in _view_defn.get("dimension_groups", []): 245 if "time_partitioning_field" in dim.get("tags", []): 246 return dim["name"] 247 elif dim["name"] == "submission": 248 has_submission = True 249 if has_submission: 250 return "submission" 251 return None
Get time partitiong dimension group for this view.
Return the name of the first dimension group tagged "time_partitioning_field", and fall back to "submission" if available.
def
get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
253 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 254 """Get required filters for this view.""" 255 filters = [] 256 view = self.views[view_name] 257 258 # Add a default filter on channel, if it's present in the view 259 default_channel = self._get_default_channel(view) 260 if default_channel is not None: 261 filters.append({"channel": default_channel}) 262 263 # Add submission filter, if present in the view 264 if time_partitioning_group := self.get_view_time_partitioning_group(view): 265 filters.append({f"{time_partitioning_group}_date": "28 days"}) 266 267 return filters
Get required filters for this view.