generator.explores.explore
Generic explore type.
1"""Generic explore type.""" 2 3from __future__ import annotations 4 5from dataclasses import dataclass, field 6from pathlib import Path 7from typing import Any, Dict, List, Optional, Tuple 8 9import lkml 10 11from ..views.lookml_utils import escape_filter_expr, slug_to_title 12 13 14@dataclass 15class Explore: 16 """A generic explore.""" 17 18 name: str 19 views: Dict[str, str] 20 views_path: Optional[Path] = None 21 defn: Optional[Dict[str, str]] = None 22 type: str = field(init=False) 23 24 def to_dict(self) -> dict: 25 """Explore instance represented as a dict.""" 26 return {self.name: {"type": self.type, "views": self.views}} 27 28 def to_lookml(self, v1_name: Optional[str]) -> List[Dict[str, Any]]: 29 """ 30 Generate LookML for this explore. 31 32 Any generation done in dependent explore's 33 `_to_lookml` takes precedence over these fields. 34 """ 35 base_lookml = {} 36 base_view_name = next( 37 ( 38 view_name 39 for view_type, view_name in self.views.items() 40 if view_type == "base_view" 41 ) 42 ) 43 for view_type, view in self.views.items(): 44 # We look at our dependent views to see if they have a 45 # "submission" field. Dependent views are any that are: 46 # - base_view 47 # - extended_view* 48 # 49 # We do not want to look at joined views. Those should be 50 # labeled as: 51 # - join* 52 # 53 # If they have a submission field, we filter on the date. 54 # This allows for filter queries to succeed. 55 if "join" in view_type: 56 continue 57 if time_partitioning_group := self.get_view_time_partitioning_group(view): 58 base_lookml["sql_always_where"] = ( 59 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 60 ) 61 62 # We only update the first returned explore 63 new_lookml = self._to_lookml(v1_name) 64 base_lookml.update(new_lookml[0]) 65 new_lookml[0] = base_lookml 66 67 return new_lookml 68 69 def _to_lookml( 70 self, 71 v1_name: Optional[str], 72 ) -> List[Dict[str, Any]]: 73 raise NotImplementedError("Only implemented in subclasses") 74 75 def get_dependent_views(self) -> List[str]: 76 """Get views this explore is dependent on.""" 77 dependent_views = [] 78 for _type, views in self.views.items(): 79 if _type.startswith("extended"): 80 continue 81 elif _type.startswith("joined"): 82 dependent_views += [view for view in views] 83 else: 84 dependent_views.append(views) 85 return dependent_views 86 87 @staticmethod 88 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 89 """Get an instance of an explore from a namespace definition.""" 90 raise NotImplementedError("Only implemented in subclasses") 91 92 def get_view_lookml(self, view: str) -> dict: 93 """Get the LookML for a view.""" 94 if self.views_path is not None: 95 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 96 raise Exception("Missing view path for get_view_lookml") 97 98 def get_unnested_fields_joins_lookml( 99 self, 100 ) -> list: 101 """Get the LookML for joining unnested fields.""" 102 views_lookml = self.get_view_lookml(self.views["base_view"]) 103 views: List[str] = [view["name"] for view in views_lookml["views"]] 104 parent_base_name = views_lookml["views"][0]["name"] 105 106 extended_views: List[str] = [] 107 if "extended_view" in self.views: 108 # check for extended views 109 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 110 extended_views = [view["name"] for view in extended_views_lookml["views"]] 111 112 views_lookml.update(extended_views_lookml) 113 views += extended_views 114 115 joins = [] 116 for view in views_lookml["views"][1:]: 117 view_name = view["name"] 118 # get repeated, nested fields that exist as separate views in lookml 119 base_name, metric = self._get_base_name_and_metric( 120 view_name=view_name, views=views 121 ) 122 metric_name = view_name 123 metric_label = slug_to_title(metric_name) 124 125 if view_name in extended_views: 126 # names of extended views are overriden by the name of the view that is extending them 127 metric_label = slug_to_title( 128 metric_name.replace(base_name, parent_base_name) 129 ) 130 base_name = parent_base_name 131 132 joins.append( 133 { 134 "name": view_name, 135 "view_label": metric_label, 136 "relationship": "one_to_many", 137 "sql": ( 138 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 139 ), 140 } 141 ) 142 143 return joins 144 145 def _get_default_channel(self, view: str) -> Optional[str]: 146 channel_params = [ 147 param 148 for _view_defn in self.get_view_lookml(view)["views"] 149 for param in _view_defn.get("filters", []) 150 if _view_defn["name"] == view and param["name"] == "channel" 151 ] 152 153 if channel_params: 154 allowed_values = channel_params[0]["suggestions"] 155 default_value = allowed_values[0] 156 return escape_filter_expr(default_value) 157 return None 158 159 def _get_base_name_and_metric( 160 self, view_name: str, views: List[str] 161 ) -> Tuple[str, str]: 162 """ 163 Get base view and metric names. 164 165 Returns the the name of the base view and the metric based on the 166 passed `view_name` and existing views. 167 168 The names are resolved in a backwards fashion to account for 169 repeated nested fields that might contain other nested fields. 170 For example: 171 172 view: sync { 173 [...] 174 dimension: payload__events { 175 sql: ${TABLE}.payload.events ;; 176 } 177 } 178 179 view: sync__payload__events { 180 [...] 181 dimension: f5_ { 182 sql: ${TABLE}.f5_ ;; 183 } 184 } 185 186 view: sync__payload__events__f5_ { 187 [...] 188 } 189 190 For these nested views to get translated to the following joins, the names 191 need to be resolved backwards: 192 193 join: sync__payload__events { 194 relationship: one_to_many 195 sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;; 196 } 197 198 join: sync__payload__events__f5_ { 199 relationship: one_to_many 200 sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;; 201 } 202 """ 203 split = view_name.split("__") 204 for index in range(len(split) - 1, 0, -1): 205 base_view = "__".join(split[:index]) 206 metric = "__".join(split[index:]) 207 if base_view in views: 208 return (base_view, metric) 209 raise Exception(f"Cannot get base name and metric from view {view_name}") 210 211 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 212 """Determine whether a this view has this dimension.""" 213 for _view_defn in self.get_view_lookml(view)["views"]: 214 if _view_defn["name"] != view: 215 continue 216 for dim in _view_defn.get("dimensions", []): 217 if dim["name"] == dimension_name: 218 return True 219 return False 220 221 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 222 """Get time partitiong dimension group for this view. 223 224 Return the name of the first dimension group tagged "time_partitioning_field", 225 and fall back to "submission" if available. 226 """ 227 has_submission = False 228 for _view_defn in self.get_view_lookml(view)["views"]: 229 if not _view_defn["name"] == view: 230 continue 231 for dim in _view_defn.get("dimension_groups", []): 232 if "time_partitioning_field" in dim.get("tags", []): 233 return dim["name"] 234 elif dim["name"] == "submission": 235 has_submission = True 236 if has_submission: 237 return "submission" 238 return None 239 240 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 241 """Get required filters for this view.""" 242 filters = [] 243 view = self.views[view_name] 244 245 # Add a default filter on channel, if it's present in the view 246 default_channel = self._get_default_channel(view) 247 if default_channel is not None: 248 filters.append({"channel": default_channel}) 249 250 # Add submission filter, if present in the view 251 if time_partitioning_group := self.get_view_time_partitioning_group(view): 252 filters.append({f"{time_partitioning_group}_date": "28 days"}) 253 254 return filters 255 256 def __eq__(self, other) -> bool: 257 """Check for equality with other View.""" 258 259 def comparable_dict(d): 260 return tuple(sorted(d.items())) 261 262 if isinstance(other, Explore): 263 return ( 264 self.name == other.name 265 and comparable_dict(self.views) == comparable_dict(other.views) 266 and self.type == other.type 267 ) 268 return False
@dataclass
class
Explore:
15@dataclass 16class Explore: 17 """A generic explore.""" 18 19 name: str 20 views: Dict[str, str] 21 views_path: Optional[Path] = None 22 defn: Optional[Dict[str, str]] = None 23 type: str = field(init=False) 24 25 def to_dict(self) -> dict: 26 """Explore instance represented as a dict.""" 27 return {self.name: {"type": self.type, "views": self.views}} 28 29 def to_lookml(self, v1_name: Optional[str]) -> List[Dict[str, Any]]: 30 """ 31 Generate LookML for this explore. 32 33 Any generation done in dependent explore's 34 `_to_lookml` takes precedence over these fields. 35 """ 36 base_lookml = {} 37 base_view_name = next( 38 ( 39 view_name 40 for view_type, view_name in self.views.items() 41 if view_type == "base_view" 42 ) 43 ) 44 for view_type, view in self.views.items(): 45 # We look at our dependent views to see if they have a 46 # "submission" field. Dependent views are any that are: 47 # - base_view 48 # - extended_view* 49 # 50 # We do not want to look at joined views. Those should be 51 # labeled as: 52 # - join* 53 # 54 # If they have a submission field, we filter on the date. 55 # This allows for filter queries to succeed. 56 if "join" in view_type: 57 continue 58 if time_partitioning_group := self.get_view_time_partitioning_group(view): 59 base_lookml["sql_always_where"] = ( 60 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 61 ) 62 63 # We only update the first returned explore 64 new_lookml = self._to_lookml(v1_name) 65 base_lookml.update(new_lookml[0]) 66 new_lookml[0] = base_lookml 67 68 return new_lookml 69 70 def _to_lookml( 71 self, 72 v1_name: Optional[str], 73 ) -> List[Dict[str, Any]]: 74 raise NotImplementedError("Only implemented in subclasses") 75 76 def get_dependent_views(self) -> List[str]: 77 """Get views this explore is dependent on.""" 78 dependent_views = [] 79 for _type, views in self.views.items(): 80 if _type.startswith("extended"): 81 continue 82 elif _type.startswith("joined"): 83 dependent_views += [view for view in views] 84 else: 85 dependent_views.append(views) 86 return dependent_views 87 88 @staticmethod 89 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 90 """Get an instance of an explore from a namespace definition.""" 91 raise NotImplementedError("Only implemented in subclasses") 92 93 def get_view_lookml(self, view: str) -> dict: 94 """Get the LookML for a view.""" 95 if self.views_path is not None: 96 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 97 raise Exception("Missing view path for get_view_lookml") 98 99 def get_unnested_fields_joins_lookml( 100 self, 101 ) -> list: 102 """Get the LookML for joining unnested fields.""" 103 views_lookml = self.get_view_lookml(self.views["base_view"]) 104 views: List[str] = [view["name"] for view in views_lookml["views"]] 105 parent_base_name = views_lookml["views"][0]["name"] 106 107 extended_views: List[str] = [] 108 if "extended_view" in self.views: 109 # check for extended views 110 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 111 extended_views = [view["name"] for view in extended_views_lookml["views"]] 112 113 views_lookml.update(extended_views_lookml) 114 views += extended_views 115 116 joins = [] 117 for view in views_lookml["views"][1:]: 118 view_name = view["name"] 119 # get repeated, nested fields that exist as separate views in lookml 120 base_name, metric = self._get_base_name_and_metric( 121 view_name=view_name, views=views 122 ) 123 metric_name = view_name 124 metric_label = slug_to_title(metric_name) 125 126 if view_name in extended_views: 127 # names of extended views are overriden by the name of the view that is extending them 128 metric_label = slug_to_title( 129 metric_name.replace(base_name, parent_base_name) 130 ) 131 base_name = parent_base_name 132 133 joins.append( 134 { 135 "name": view_name, 136 "view_label": metric_label, 137 "relationship": "one_to_many", 138 "sql": ( 139 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 140 ), 141 } 142 ) 143 144 return joins 145 146 def _get_default_channel(self, view: str) -> Optional[str]: 147 channel_params = [ 148 param 149 for _view_defn in self.get_view_lookml(view)["views"] 150 for param in _view_defn.get("filters", []) 151 if _view_defn["name"] == view and param["name"] == "channel" 152 ] 153 154 if channel_params: 155 allowed_values = channel_params[0]["suggestions"] 156 default_value = allowed_values[0] 157 return escape_filter_expr(default_value) 158 return None 159 160 def _get_base_name_and_metric( 161 self, view_name: str, views: List[str] 162 ) -> Tuple[str, str]: 163 """ 164 Get base view and metric names. 165 166 Returns the the name of the base view and the metric based on the 167 passed `view_name` and existing views. 168 169 The names are resolved in a backwards fashion to account for 170 repeated nested fields that might contain other nested fields. 171 For example: 172 173 view: sync { 174 [...] 175 dimension: payload__events { 176 sql: ${TABLE}.payload.events ;; 177 } 178 } 179 180 view: sync__payload__events { 181 [...] 182 dimension: f5_ { 183 sql: ${TABLE}.f5_ ;; 184 } 185 } 186 187 view: sync__payload__events__f5_ { 188 [...] 189 } 190 191 For these nested views to get translated to the following joins, the names 192 need to be resolved backwards: 193 194 join: sync__payload__events { 195 relationship: one_to_many 196 sql: LEFT JOIN UNNEST(${sync.payload__events}) AS sync__payload__events ;; 197 } 198 199 join: sync__payload__events__f5_ { 200 relationship: one_to_many 201 sql: LEFT JOIN UNNEST(${sync__payload__events.f5_}) AS sync__payload__events__f5_ ;; 202 } 203 """ 204 split = view_name.split("__") 205 for index in range(len(split) - 1, 0, -1): 206 base_view = "__".join(split[:index]) 207 metric = "__".join(split[index:]) 208 if base_view in views: 209 return (base_view, metric) 210 raise Exception(f"Cannot get base name and metric from view {view_name}") 211 212 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 213 """Determine whether a this view has this dimension.""" 214 for _view_defn in self.get_view_lookml(view)["views"]: 215 if _view_defn["name"] != view: 216 continue 217 for dim in _view_defn.get("dimensions", []): 218 if dim["name"] == dimension_name: 219 return True 220 return False 221 222 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 223 """Get time partitiong dimension group for this view. 224 225 Return the name of the first dimension group tagged "time_partitioning_field", 226 and fall back to "submission" if available. 227 """ 228 has_submission = False 229 for _view_defn in self.get_view_lookml(view)["views"]: 230 if not _view_defn["name"] == view: 231 continue 232 for dim in _view_defn.get("dimension_groups", []): 233 if "time_partitioning_field" in dim.get("tags", []): 234 return dim["name"] 235 elif dim["name"] == "submission": 236 has_submission = True 237 if has_submission: 238 return "submission" 239 return None 240 241 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 242 """Get required filters for this view.""" 243 filters = [] 244 view = self.views[view_name] 245 246 # Add a default filter on channel, if it's present in the view 247 default_channel = self._get_default_channel(view) 248 if default_channel is not None: 249 filters.append({"channel": default_channel}) 250 251 # Add submission filter, if present in the view 252 if time_partitioning_group := self.get_view_time_partitioning_group(view): 253 filters.append({f"{time_partitioning_group}_date": "28 days"}) 254 255 return filters 256 257 def __eq__(self, other) -> bool: 258 """Check for equality with other View.""" 259 260 def comparable_dict(d): 261 return tuple(sorted(d.items())) 262 263 if isinstance(other, Explore): 264 return ( 265 self.name == other.name 266 and comparable_dict(self.views) == comparable_dict(other.views) 267 and self.type == other.type 268 ) 269 return False
A generic explore.
Explore( name: str, views: Dict[str, str], views_path: Optional[pathlib.Path] = None, defn: Optional[Dict[str, str]] = None)
def
to_dict(self) -> dict:
25 def to_dict(self) -> dict: 26 """Explore instance represented as a dict.""" 27 return {self.name: {"type": self.type, "views": self.views}}
Explore instance represented as a dict.
def
to_lookml(self, v1_name: Optional[str]) -> List[Dict[str, Any]]:
29 def to_lookml(self, v1_name: Optional[str]) -> List[Dict[str, Any]]: 30 """ 31 Generate LookML for this explore. 32 33 Any generation done in dependent explore's 34 `_to_lookml` takes precedence over these fields. 35 """ 36 base_lookml = {} 37 base_view_name = next( 38 ( 39 view_name 40 for view_type, view_name in self.views.items() 41 if view_type == "base_view" 42 ) 43 ) 44 for view_type, view in self.views.items(): 45 # We look at our dependent views to see if they have a 46 # "submission" field. Dependent views are any that are: 47 # - base_view 48 # - extended_view* 49 # 50 # We do not want to look at joined views. Those should be 51 # labeled as: 52 # - join* 53 # 54 # If they have a submission field, we filter on the date. 55 # This allows for filter queries to succeed. 56 if "join" in view_type: 57 continue 58 if time_partitioning_group := self.get_view_time_partitioning_group(view): 59 base_lookml["sql_always_where"] = ( 60 f"${{{base_view_name}.{time_partitioning_group}_date}} >= '2010-01-01'" 61 ) 62 63 # We only update the first returned explore 64 new_lookml = self._to_lookml(v1_name) 65 base_lookml.update(new_lookml[0]) 66 new_lookml[0] = base_lookml 67 68 return new_lookml
Generate LookML for this explore.
Any generation done in dependent explore's
_to_lookml
takes precedence over these fields.
def
get_dependent_views(self) -> List[str]:
76 def get_dependent_views(self) -> List[str]: 77 """Get views this explore is dependent on.""" 78 dependent_views = [] 79 for _type, views in self.views.items(): 80 if _type.startswith("extended"): 81 continue 82 elif _type.startswith("joined"): 83 dependent_views += [view for view in views] 84 else: 85 dependent_views.append(views) 86 return dependent_views
Get views this explore is dependent on.
88 @staticmethod 89 def from_dict(name: str, defn: dict, views_path: Path) -> Explore: 90 """Get an instance of an explore from a namespace definition.""" 91 raise NotImplementedError("Only implemented in subclasses")
Get an instance of an explore from a namespace definition.
def
get_view_lookml(self, view: str) -> dict:
93 def get_view_lookml(self, view: str) -> dict: 94 """Get the LookML for a view.""" 95 if self.views_path is not None: 96 return lkml.load((self.views_path / f"{view}.view.lkml").read_text()) 97 raise Exception("Missing view path for get_view_lookml")
Get the LookML for a view.
def
get_unnested_fields_joins_lookml(self) -> list:
99 def get_unnested_fields_joins_lookml( 100 self, 101 ) -> list: 102 """Get the LookML for joining unnested fields.""" 103 views_lookml = self.get_view_lookml(self.views["base_view"]) 104 views: List[str] = [view["name"] for view in views_lookml["views"]] 105 parent_base_name = views_lookml["views"][0]["name"] 106 107 extended_views: List[str] = [] 108 if "extended_view" in self.views: 109 # check for extended views 110 extended_views_lookml = self.get_view_lookml(self.views["extended_view"]) 111 extended_views = [view["name"] for view in extended_views_lookml["views"]] 112 113 views_lookml.update(extended_views_lookml) 114 views += extended_views 115 116 joins = [] 117 for view in views_lookml["views"][1:]: 118 view_name = view["name"] 119 # get repeated, nested fields that exist as separate views in lookml 120 base_name, metric = self._get_base_name_and_metric( 121 view_name=view_name, views=views 122 ) 123 metric_name = view_name 124 metric_label = slug_to_title(metric_name) 125 126 if view_name in extended_views: 127 # names of extended views are overriden by the name of the view that is extending them 128 metric_label = slug_to_title( 129 metric_name.replace(base_name, parent_base_name) 130 ) 131 base_name = parent_base_name 132 133 joins.append( 134 { 135 "name": view_name, 136 "view_label": metric_label, 137 "relationship": "one_to_many", 138 "sql": ( 139 f"LEFT JOIN UNNEST(${{{base_name}.{metric}}}) AS {metric_name} " 140 ), 141 } 142 ) 143 144 return joins
Get the LookML for joining unnested fields.
def
has_view_dimension(self, view: str, dimension_name: str) -> bool:
212 def has_view_dimension(self, view: str, dimension_name: str) -> bool: 213 """Determine whether a this view has this dimension.""" 214 for _view_defn in self.get_view_lookml(view)["views"]: 215 if _view_defn["name"] != view: 216 continue 217 for dim in _view_defn.get("dimensions", []): 218 if dim["name"] == dimension_name: 219 return True 220 return False
Determine whether a this view has this dimension.
def
get_view_time_partitioning_group(self, view: str) -> Optional[str]:
222 def get_view_time_partitioning_group(self, view: str) -> Optional[str]: 223 """Get time partitiong dimension group for this view. 224 225 Return the name of the first dimension group tagged "time_partitioning_field", 226 and fall back to "submission" if available. 227 """ 228 has_submission = False 229 for _view_defn in self.get_view_lookml(view)["views"]: 230 if not _view_defn["name"] == view: 231 continue 232 for dim in _view_defn.get("dimension_groups", []): 233 if "time_partitioning_field" in dim.get("tags", []): 234 return dim["name"] 235 elif dim["name"] == "submission": 236 has_submission = True 237 if has_submission: 238 return "submission" 239 return None
Get time partitiong dimension group for this view.
Return the name of the first dimension group tagged "time_partitioning_field", and fall back to "submission" if available.
def
get_required_filters(self, view_name: str) -> List[Dict[str, str]]:
241 def get_required_filters(self, view_name: str) -> List[Dict[str, str]]: 242 """Get required filters for this view.""" 243 filters = [] 244 view = self.views[view_name] 245 246 # Add a default filter on channel, if it's present in the view 247 default_channel = self._get_default_channel(view) 248 if default_channel is not None: 249 filters.append({"channel": default_channel}) 250 251 # Add submission filter, if present in the view 252 if time_partitioning_group := self.get_view_time_partitioning_group(view): 253 filters.append({f"{time_partitioning_group}_date": "28 days"}) 254 255 return filters
Get required filters for this view.