generator.views.metric_definitions_view
Class to describe a view with metrics from metric-hub.
1"""Class to describe a view with metrics from metric-hub.""" 2 3from __future__ import annotations 4 5import re 6from typing import Any, Dict, Iterator, List, Optional, Union 7 8from generator.metrics_utils import MetricsConfigLoader 9 10from . import lookml_utils 11from .view import View, ViewDict 12 13 14class MetricDefinitionsView(View): 15 """A view for metric-hub metrics that come from the same data source.""" 16 17 type: str = "metric_definitions_view" 18 19 # Time unit divisors for converting days to different granularities 20 TIME_UNITS = [ 21 ("date", 1), # Daily: no conversion needed 22 ("week", 7), # Weekly: divide by 7 23 ("month", 30), # Monthly: approximate 30 days per month 24 ("quarter", 90), # Quarterly: approximate 90 days per quarter 25 ("year", 365), # Yearly: approximate 365 days per year 26 ] 27 28 def __init__(self, namespace: str, name: str, tables: List[Dict[str, str]]): 29 """Get an instance of an MetricDefinitionsView.""" 30 super().__init__(namespace, name, MetricDefinitionsView.type, tables) 31 32 @classmethod 33 def from_db_views( 34 klass, 35 namespace: str, 36 is_glean: bool, 37 channels: List[Dict[str, str]], 38 db_views: dict, 39 ) -> Iterator[MetricDefinitionsView]: 40 """Get Metric Definition Views from db views and app variants.""" 41 return iter(()) 42 43 @classmethod 44 def from_dict( 45 klass, namespace: str, name: str, definition: ViewDict 46 ) -> MetricDefinitionsView: 47 """Get a MetricDefinitionsView from a dict representation.""" 48 return klass(namespace, name, definition.get("tables", [])) 49 50 def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]: 51 """Get this view as LookML.""" 52 namespace_definitions = MetricsConfigLoader.configs.get_platform_definitions( 53 self.namespace 54 ) 55 if namespace_definitions is None: 56 return {} 57 58 # get all metric definitions that depend on the data source represented by this view 59 data_source_name = re.sub("^metric_definitions_", "", self.name) 60 data_source_definition = MetricsConfigLoader.configs.get_data_source_definition( 61 data_source_name, self.namespace 62 ) 63 64 if data_source_definition is None: 65 return {} 66 67 # todo: hide deprecated metrics? 68 metric_definitions = [ 69 f"""{ 70 MetricsConfigLoader.configs.get_env().from_string(metric.select_expression).render() 71 } AS {metric_slug},\n""" 72 for metric_slug, metric in namespace_definitions.metrics.definitions.items() 73 if metric.select_expression 74 and metric.data_source.name == data_source_name 75 and metric.type != "histogram" 76 ] 77 78 if metric_definitions == []: 79 return {} 80 81 # Metric definitions are intended to aggregated by client per date. 82 # A derived table is needed to do these aggregations, instead of defining them as measures 83 # we want to have them available as dimensions (which don't allow aggregations in their definitions) 84 # to allow for custom measures to be later defined in Looker that aggregate these per client metrics. 85 view_defn: Dict[str, Any] = {"name": self.name} 86 87 ignore_base_fields = [ 88 "client_id", 89 "submission_date", 90 "submission", 91 "first_run", 92 ] + [ 93 metric_slug 94 for metric_slug, metric in namespace_definitions.metrics.definitions.items() 95 if metric.select_expression 96 and metric.data_source.name == data_source_name 97 and metric.type != "histogram" 98 ] 99 100 base_view_dimensions = {} 101 joined_data_sources = [] 102 103 # check if the metric data source has joins 104 # joined data sources are generally used for creating the "Base Fields" 105 if data_source_definition.joins: 106 # determine the dimensions selected by the joined data sources 107 for joined_data_source_slug, join in data_source_definition.joins.items(): 108 joined_data_source = ( 109 MetricsConfigLoader.configs.get_data_source_definition( 110 joined_data_source_slug, self.namespace 111 ) 112 ) 113 114 if joined_data_source.columns_as_dimensions: 115 joined_data_sources.append(joined_data_source) 116 117 date_filter = None 118 if joined_data_source.submission_date_column != "NULL": 119 date_filter = ( 120 None 121 if joined_data_source.submission_date_column is None 122 or joined_data_source.submission_date_column == "NULL" 123 else f"{joined_data_source.submission_date_column} = '2023-01-01'" 124 ) 125 126 # create Looker dimensions by doing a dryrun 127 query = MetricsConfigLoader.configs.get_data_source_sql( 128 joined_data_source_slug, 129 self.namespace, 130 where=date_filter, 131 ).format(dataset=self.namespace) 132 133 base_view_dimensions[joined_data_source_slug] = ( 134 lookml_utils._generate_dimensions_from_query( 135 query, dryrun=dryrun 136 ) 137 ) 138 139 if ( 140 data_source_definition.client_id_column == "NULL" 141 and not base_view_dimensions 142 ) or data_source_definition.columns_as_dimensions: 143 # if the metrics data source doesn't have any joins then use the dimensions 144 # of the data source itself as base fields 145 date_filter = None 146 if data_source_definition.submission_date_column != "NULL": 147 date_filter = ( 148 "submission_date = '2023-01-01'" 149 if data_source_definition.submission_date_column is None 150 else f"{data_source_definition.submission_date_column} = '2023-01-01'" 151 ) 152 153 query = MetricsConfigLoader.configs.get_data_source_sql( 154 data_source_definition.name, 155 self.namespace, 156 where=date_filter, 157 ignore_joins=True, 158 ).format(dataset=self.namespace) 159 160 base_view_dimensions[data_source_definition.name] = ( 161 lookml_utils._generate_dimensions_from_query(query, dryrun) 162 ) 163 164 # to prevent duplicate dimensions, especially when working with time dimensions 165 # where names are modified potentially causing naming collisions 166 seen_dimensions = set() 167 # prepare base field data for query 168 base_view_fields = [] 169 for data_source, dimensions in base_view_dimensions.items(): 170 for dimension in dimensions: 171 if ( 172 dimension["name"] not in ignore_base_fields 173 and dimension["name"] not in seen_dimensions 174 and "hidden" not in dimension 175 ): 176 sql = ( 177 f"{data_source}.{dimension['name'].replace('__', '.')} AS" 178 + f" {data_source}_{dimension['name']},\n" 179 ) 180 # date/time/timestamp suffixes are removed when generating lookml dimensions, however we 181 # need the original field name for the derived view SQL 182 if dimension["type"] == "time" and not dimension["sql"].endswith( 183 dimension["name"] 184 ): 185 suffix = dimension["sql"].split( 186 dimension["name"].replace("__", ".") 187 )[-1] 188 sql = ( 189 f"{data_source}.{(dimension['name']+suffix).replace('__', '.')} AS" 190 + f" {data_source}_{dimension['name']},\n" 191 ) 192 193 base_view_fields.append( 194 { 195 "name": f"{data_source}_{dimension['name']}", 196 "select_sql": f"{data_source}_{dimension['name']},\n", 197 "sql": sql, 198 } 199 ) 200 seen_dimensions.add(dimension["name"]) 201 202 client_id_field = ( 203 "NULL" 204 if data_source_definition.client_id_column == "NULL" 205 else f'{data_source_definition.client_id_column or "client_id"}' 206 ) 207 208 # filters for date ranges 209 where_sql = " AND ".join( 210 [ 211 f""" 212 {data_source.name}.{data_source.submission_date_column or "submission_date"} 213 {{% if analysis_period._is_filtered %}} 214 BETWEEN 215 DATE_SUB( 216 COALESCE( 217 SAFE_CAST( 218 {{% date_start analysis_period %}} AS DATE 219 ), CURRENT_DATE()), 220 INTERVAL {{% parameter lookback_days %}} DAY 221 ) AND 222 COALESCE( 223 SAFE_CAST( 224 {{% date_end analysis_period %}} AS DATE 225 ), CURRENT_DATE()) 226 {{% else %}} 227 BETWEEN 228 DATE_SUB( 229 COALESCE( 230 SAFE_CAST( 231 {{% date_start submission_date %}} AS DATE 232 ), CURRENT_DATE()), 233 INTERVAL {{% parameter lookback_days %}} DAY 234 ) AND 235 COALESCE( 236 SAFE_CAST( 237 {{% date_end submission_date %}} AS DATE 238 ), CURRENT_DATE()) 239 {{% endif %}} 240 """ 241 for data_source in [data_source_definition] + joined_data_sources 242 if data_source.submission_date_column != "NULL" 243 ] 244 ) 245 246 # filte on sample_id if such a field exists 247 for field in base_view_fields: 248 if field["name"].endswith("_sample_id"): 249 where_sql += f""" 250 AND 251 {field['name'].split('_sample_id')[0]}.sample_id < {{% parameter sampling %}} 252 """ 253 break 254 255 view_defn["derived_table"] = { 256 "sql": f""" 257 SELECT 258 {"".join(metric_definitions)} 259 {"".join([field['select_sql'] for field in base_view_fields])} 260 {client_id_field} AS client_id, 261 {{% if aggregate_metrics_by._parameter_value == 'day' %}} 262 {data_source_definition.submission_date_column or "submission_date"} AS analysis_basis 263 {{% elsif aggregate_metrics_by._parameter_value == 'week' %}} 264 (FORMAT_DATE( 265 '%F', 266 DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"}, 267 WEEK(MONDAY))) 268 ) AS analysis_basis 269 {{% elsif aggregate_metrics_by._parameter_value == 'month' %}} 270 (FORMAT_DATE( 271 '%Y-%m', 272 {data_source_definition.submission_date_column or "submission_date"}) 273 ) AS analysis_basis 274 {{% elsif aggregate_metrics_by._parameter_value == 'quarter' %}} 275 (FORMAT_DATE( 276 '%Y-%m', 277 DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"}, 278 QUARTER)) 279 ) AS analysis_basis 280 {{% elsif aggregate_metrics_by._parameter_value == 'year' %}} 281 (EXTRACT( 282 YEAR FROM {data_source_definition.submission_date_column or "submission_date"}) 283 ) AS analysis_basis 284 {{% else %}} 285 NULL as analysis_basis 286 {{% endif %}} 287 FROM 288 ( 289 SELECT 290 {data_source_name}.*, 291 {"".join([field['sql'] for field in base_view_fields])} 292 FROM 293 { 294 MetricsConfigLoader.configs.get_data_source_sql( 295 data_source_name, 296 self.namespace, 297 select_fields=False 298 ).format(dataset=self.namespace) 299 } 300 WHERE {where_sql} 301 ) 302 GROUP BY 303 {"".join([field['select_sql'] for field in base_view_fields])} 304 client_id, 305 analysis_basis 306 """ 307 } 308 309 view_defn["dimensions"] = self.get_dimensions() 310 view_defn["dimension_groups"] = self.get_dimension_groups() 311 312 # add the Looker dimensions 313 for data_source, dimensions in base_view_dimensions.items(): 314 for dimension in dimensions: 315 if dimension["name"] not in ignore_base_fields: 316 dimension["sql"] = ( 317 "${TABLE}." + f"{data_source}_{dimension['name']}" 318 ) 319 dimension["group_label"] = "Base Fields" 320 if not lookml_utils._is_dimension_group(dimension): 321 view_defn["dimensions"].append(dimension) 322 else: 323 view_defn["dimension_groups"].append(dimension) 324 # avoid duplicate dimensions 325 ignore_base_fields.append(dimension["name"]) 326 327 view_defn["measures"] = self.get_measures( 328 view_defn["dimensions"], 329 ) 330 view_defn["sets"] = self._get_sets() 331 rolling_average_window_sizes = sorted( 332 { 333 window_size 334 for metric in namespace_definitions.metrics.definitions.values() 335 if metric.select_expression 336 and metric.data_source.name == data_source_name 337 and metric.type != "histogram" 338 and metric.statistics 339 for stat_slug, stat_conf in metric.statistics.items() 340 if stat_slug == "rolling_average" 341 for window_size in stat_conf.get("window_sizes", []) 342 } 343 ) 344 view_defn["parameters"] = self._get_parameters( 345 view_defn["dimensions"], rolling_average_window_sizes 346 ) 347 view_defn["filters"] = self._get_filters() 348 349 return {"views": [view_defn]} 350 351 def get_dimensions( 352 self, 353 _table=None, 354 _v1_name: Optional[str] = None, 355 _dryrun=None, 356 ) -> List[Dict[str, Any]]: 357 """Get the set of dimensions for this view based on the metric definitions in metric-hub.""" 358 namespace_definitions = MetricsConfigLoader.configs.get_platform_definitions( 359 self.namespace 360 ) 361 metric_definitions = namespace_definitions.metrics.definitions 362 data_source_name = re.sub("^metric_definitions_", "", self.name) 363 364 return [ 365 { 366 "name": "client_id", 367 "type": "string", 368 "sql": "SAFE_CAST(${TABLE}.client_id AS STRING)", 369 "label": "Client ID", 370 "primary_key": "yes", 371 "group_label": "Base Fields", 372 "description": "Unique client identifier", 373 }, 374 ] + [ # add a dimension for each metric definition 375 { 376 "name": metric_slug, 377 "group_label": "Metrics", 378 "label": metric.friendly_name 379 or lookml_utils.slug_to_title(metric_slug), 380 "description": metric.description or "", 381 "type": "number", 382 "sql": "${TABLE}." + metric_slug, 383 } 384 for metric_slug, metric in metric_definitions.items() 385 if metric.select_expression 386 and metric.data_source.name == data_source_name 387 and metric.type != "histogram" 388 ] 389 390 def get_dimension_groups(self) -> List[Dict[str, Any]]: 391 """Get dimension groups for this view.""" 392 return [ 393 { 394 "name": "submission", 395 "type": "time", 396 "datatype": "date", 397 "group_label": "Base Fields", 398 "sql": "${TABLE}.analysis_basis", 399 "label": "Submission", 400 "timeframes": [ 401 "raw", 402 "date", 403 "week", 404 "month", 405 "quarter", 406 "year", 407 ], 408 } 409 ] 410 411 def _get_sets(self) -> List[Dict[str, Any]]: 412 """Generate metric sets.""" 413 # group all the metric dimensions into a set 414 dimensions = self.get_dimensions() 415 measures = self.get_measures(dimensions) 416 417 return [ 418 { 419 "name": "metrics", 420 "fields": [ 421 dimension["name"] 422 for dimension in dimensions 423 if dimension["name"] != "client_id" 424 ] 425 + [measure["name"] for measure in measures], 426 } 427 ] 428 429 def _get_parameters( 430 self, dimensions: List[dict], rolling_average_window_sizes: List[int] = [] 431 ): 432 hide_sampling = "yes" 433 434 for dim in dimensions: 435 if dim["name"] == "sample_id": 436 hide_sampling = "no" 437 break 438 439 return [ 440 { 441 "name": "aggregate_metrics_by", 442 "label": "Aggregate Client Metrics Per", 443 "type": "unquoted", 444 "default_value": "day", 445 "allowed_values": [ 446 {"label": "Per Day", "value": "day"}, 447 {"label": "Per Week", "value": "week"}, 448 {"label": "Per Month", "value": "month"}, 449 {"label": "Per Quarter", "value": "quarter"}, 450 {"label": "Per Year", "value": "year"}, 451 {"label": "Overall", "value": "overall"}, 452 ], 453 }, 454 { 455 "name": "sampling", 456 "label": "Sample of source data in %", 457 "type": "unquoted", 458 "default_value": "100", 459 "hidden": hide_sampling, 460 }, 461 { 462 "name": "lookback_days", 463 "label": "Lookback (Days)", 464 "type": "unquoted", 465 "description": "Number of days added before the filtered date range. " 466 + "Useful for period-over-period comparisons.", 467 "default_value": "0", 468 }, 469 { 470 "name": "date_groupby_position", 471 "label": "Date Group By Position", 472 "type": "unquoted", 473 "description": "Position of the date field in the group by clause. " 474 + "Required when submission_week, submission_month, submission_quarter, submission_year " 475 + "is selected as BigQuery can't correctly resolve the GROUP BY otherwise", 476 "default_value": "", 477 }, 478 ] + ( 479 [ 480 { 481 "name": "rolling_average_window_size", 482 "label": "Rolling Average Custom Window Size (days)", 483 "type": "unquoted", 484 "description": "Number of days for the custom rolling average window.", 485 "default_value": str(rolling_average_window_sizes[0]), 486 "allowed_values": [ 487 {"label": f"{w} days", "value": str(w)} 488 for w in rolling_average_window_sizes 489 ], 490 } 491 ] 492 if rolling_average_window_sizes 493 else [] 494 ) 495 496 def _get_filters(self): 497 return [ 498 { 499 "name": "analysis_period", 500 "type": "date", 501 "label": "Analysis Period (with Lookback)", 502 "description": "Use this filter to define the main analysis period. " 503 + "The results will include the selected date range plus any additional " 504 + "days specified by the 'Lookback days' setting.", 505 } 506 ] 507 508 def get_measures( 509 self, dimensions: List[dict] 510 ) -> List[Dict[str, Union[str, List[Dict[str, str]]]]]: 511 """Get statistics as measures.""" 512 measures = [] 513 sampling = "1" 514 515 for dim in dimensions: 516 if dim["name"] == "sample_id": 517 sampling = "100 / {% parameter sampling %}" 518 break 519 520 for dimension in dimensions: 521 metric = MetricsConfigLoader.configs.get_metric_definition( 522 dimension["name"], self.namespace 523 ) 524 if metric and metric.statistics: 525 for statistic_slug, statistic_conf in metric.statistics.items(): 526 dimension_label = dimension.get("label") or dimension.get("name") 527 if statistic_slug in [ 528 "average", 529 "max", 530 "min", 531 "median", 532 ]: 533 measures.append( 534 { 535 "name": f"{dimension['name']}_{statistic_slug}", 536 "type": statistic_slug, 537 "sql": "${TABLE}." + dimension["name"], 538 "label": f"{dimension_label} {statistic_slug.title()}", 539 "group_label": "Statistics", 540 "description": f"{statistic_slug.title()} of {dimension_label}", 541 } 542 ) 543 elif statistic_slug == "sum": 544 measures.append( 545 { 546 "name": f"{dimension['name']}_{statistic_slug}", 547 "type": "sum", 548 "sql": "${TABLE}." + dimension["name"] + "*" + sampling, 549 "label": f"{dimension_label} Sum", 550 "group_label": "Statistics", 551 "description": f"Sum of {dimension_label}", 552 } 553 ) 554 elif statistic_slug == "client_count": 555 measures.append( 556 { 557 "name": ( 558 f"{dimension['name']}_{statistic_slug}_sampled" 559 if sampling 560 else f"{dimension['name']}_{statistic_slug}" 561 ), 562 "type": "count_distinct", 563 "label": f"{dimension_label} Client Count", 564 "group_label": "Statistics", 565 "sql": "IF(${TABLE}." 566 + f"{dimension['name']} > 0, " 567 + "${TABLE}.client_id, SAFE_CAST(NULL AS STRING))", 568 "description": f"Number of clients with {dimension_label}", 569 "hidden": "yes" if sampling else "no", 570 } 571 ) 572 573 if sampling: 574 measures.append( 575 { 576 "name": f"{dimension['name']}_{statistic_slug}", 577 "type": "number", 578 "label": f"{dimension_label} Client Count", 579 "group_label": "Statistics", 580 "sql": "${" 581 + f"{dimension['name']}_{statistic_slug}_sampled" 582 + "} *" 583 + sampling, 584 "description": f"Number of clients with {dimension_label}", 585 } 586 ) 587 elif statistic_slug == "dau_proportion": 588 if "numerator" in statistic_conf: 589 [numerator, numerator_stat] = statistic_conf[ 590 "numerator" 591 ].split(".") 592 measures.append( 593 { 594 "name": "DAU_sampled" if sampling else "DAU", 595 "type": "count_distinct", 596 "label": "DAU", 597 "group_label": "Statistics", 598 "sql": "${TABLE}.client_id", 599 "hidden": "yes", 600 } 601 ) 602 603 if sampling: 604 measures.append( 605 { 606 "name": "DAU", 607 "type": "number", 608 "label": "DAU", 609 "group_label": "Statistics", 610 "sql": "${DAU_sampled} *" + sampling, 611 "hidden": "yes", 612 } 613 ) 614 615 measures.append( 616 { 617 "name": f"{dimension['name']}_{statistic_slug}", 618 "type": "number", 619 "label": f"{dimension_label} DAU Proportion", 620 "sql": "SAFE_DIVIDE(${" 621 + f"{numerator}_{numerator_stat}" 622 + "}, ${DAU})", 623 "group_label": "Statistics", 624 "description": f"Proportion of daily active users with {dimension['name']}", 625 } 626 ) 627 elif statistic_slug == "ratio": 628 if ( 629 "numerator" in statistic_conf 630 and "denominator" in statistic_conf 631 ): 632 [numerator, numerator_stat] = statistic_conf[ 633 "numerator" 634 ].split(".") 635 [denominator, denominator_stat] = statistic_conf[ 636 "denominator" 637 ].split(".") 638 639 measures.append( 640 { 641 "name": f"{dimension['name']}_{statistic_slug}", 642 "type": "number", 643 "label": f"{dimension_label} Ratio", 644 "sql": "SAFE_DIVIDE(${" 645 + f"{numerator}_{numerator_stat}" 646 + "}, ${" 647 + f"{denominator}_{denominator_stat}" 648 + "})", 649 "group_label": "Statistics", 650 "description": f"""" 651 Ratio between {statistic_conf['numerator']} and 652 {statistic_conf['denominator']}""", 653 } 654 ) 655 elif statistic_slug == "rolling_average": 656 # rolling averages are computed over existing statistics (e.g. sum, ratio) 657 aggregations = statistic_conf.get("aggregations", ["sum"]) 658 659 # Build a dynamic PARTITION BY clause for the window function. 660 # For each base field dimension (non-metric, non-client_id), emit a 661 # liquid conditional so the field is only included in the partition 662 # when it's actually in the query. The trailing constant `1` ensures 663 # the PARTITION BY clause is never empty (when no grouping dimension 664 # is selected `PARTITION BY 1` is equivalent to a global window). 665 partition_by_conditions = "".join( 666 f"{{% if {self.name}.{dim['name']}._is_selected %}}{dim['sql']}," 667 f"{{% endif %}}" 668 for dim in dimensions 669 if dim.get("group_label") == "Base Fields" 670 and dim["name"] != "client_id" 671 ) 672 partition_by_clause = ( 673 f"PARTITION BY {partition_by_conditions} 1" 674 ) 675 676 for aggregation in aggregations: 677 # find measures that match the current dimension and aggregation type 678 matching_measures = [ 679 m 680 for m in measures 681 if m["name"].startswith( 682 f"{dimension['name']}_{aggregation}" 683 ) 684 ] 685 if "window_sizes" in statistic_conf: 686 for window_size in statistic_conf["window_sizes"]: 687 for matching_measure in matching_measures: 688 # these statistics require some time dimension to be selected 689 measures.append( 690 { 691 "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_{window_size}_day", 692 "type": "number", 693 "label": f"{matching_measure['label']} {window_size} Day Rolling Average", 694 "sql": f""" 695 {{% if {self.name}.submission_date._is_selected or 696 {self.name}.submission_week._is_selected or 697 {self.name}.submission_month._is_selected or 698 {self.name}.submission_quarter._is_selected or 699 {self.name}.submission_year._is_selected %}} 700 AVG(${{{matching_measure['name']}}}) OVER ( 701 {partition_by_clause} 702 {{% if date_groupby_position._parameter_value != "" %}} 703 ORDER BY {{% parameter date_groupby_position %}} 704 {{% elsif {self.name}.submission_date._is_selected %}} 705 ORDER BY ${{TABLE}}.analysis_basis 706 {{% else %}} 707 ERROR("date_groupby_position needs to be set when using submission_week, 708 submission_month, submission_quarter, or submission_year") 709 {{% endif %}} 710 ROWS BETWEEN {window_size - 1} PRECEDING AND CURRENT ROW 711 ) 712 {{% else %}} 713 ERROR('Please select a "submission_*" field to compute the rolling average') 714 {{% endif %}} 715 """, 716 "group_label": "Statistics", 717 "description": f"{window_size} day rolling average of {dimension_label}", 718 } 719 ) 720 721 # Parametric-window measure: uses the rolling_average_window_size 722 # parameter so the user can set any window size at query time. 723 # The parameter value must be n-1 (preceding rows count) because 724 # SQL does not allow arithmetic in ROWS BETWEEN expressions. 725 for matching_measure in matching_measures: 726 measures.append( 727 { 728 "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_custom_window", 729 "type": "number", 730 "label": f"{matching_measure['label']} Custom Window Rolling Average", 731 "sql": f""" 732 {{% if {self.name}.submission_date._is_selected or 733 {self.name}.submission_week._is_selected or 734 {self.name}.submission_month._is_selected or 735 {self.name}.submission_quarter._is_selected or 736 {self.name}.submission_year._is_selected %}} 737 AVG(${{{matching_measure['name']}}}) OVER ( 738 {partition_by_clause} 739 {{% if date_groupby_position._parameter_value != "" %}} 740 ORDER BY {{% parameter date_groupby_position %}} 741 {{% elsif {self.name}.submission_date._is_selected %}} 742 ORDER BY ${{TABLE}}.analysis_basis 743 {{% else %}} 744 ERROR("date_groupby_position needs to be set when using submission_week, 745 submission_month, submission_quarter, or submission_year") 746 {{% endif %}} 747 ROWS BETWEEN 748 {{{{ rolling_average_window_size._parameter_value | minus: 1 }}}} 749 PRECEDING AND CURRENT ROW 750 ) 751 {{% else %}} 752 ERROR('Please select a "submission_*" field to compute the rolling average') 753 {{% endif %}} 754 """, 755 "group_label": "Statistics", 756 "description": f"Rolling average of {dimension_label} using a window size " 757 + "controlled by the 'Rolling Average Custom Window Size' parameter.", 758 } 759 ) 760 761 # period-over-period measures compare current values with historical values 762 if "period_over_period" in statistic_conf: 763 # find all statistics that have period-over-period configured 764 matching_measures = [ 765 m 766 for m in measures 767 if m["name"].startswith( 768 f"{dimension['name']}_{statistic_slug}" 769 ) 770 and "_period_over_period_" not in m["name"] 771 ] 772 773 # create period-over-period measures for each configured time period 774 for period in statistic_conf["period_over_period"].get( 775 "periods", [] 776 ): 777 for matching_measure in matching_measures: 778 original_sql = matching_measure["sql"] 779 780 # rolling averages need special handling to adjust window sizes 781 # based on the selected time granularity 782 if ( 783 statistic_slug == "rolling_average" 784 and "_custom_window" in matching_measure["name"] 785 ): 786 sql = self._create_custom_window_period_sql( 787 original_sql, period 788 ) 789 elif statistic_slug == "rolling_average": 790 sql = self._create_rolling_average_period_sql( 791 original_sql, period 792 ) 793 else: 794 # standard measures use LAG function with time-adjusted periods 795 sql = self._create_lag_period_sql( 796 matching_measure, period 797 ) 798 799 # generate different types of period-over-period comparisons 800 for kind in statistic_conf["period_over_period"].get( 801 "kinds", ["previous"] 802 ): 803 if kind == "difference": 804 comparison_sql = f"({original_sql}) - ({sql})" 805 elif kind == "relative_change": 806 comparison_sql = f"SAFE_DIVIDE(({original_sql}), NULLIF(({sql}), 0)) - 1" 807 else: 808 comparison_sql = sql 809 810 measures.append( 811 { 812 "name": f"{matching_measure['name']}_{period}_day_period_over_period_{kind}", 813 "type": "number", 814 "label": f"{matching_measure['label']} " 815 + f"{period} Day Period Over Period {kind.capitalize()}", 816 "description": f"Period over period {kind.capitalize()} of " 817 + f"{matching_measure['label']} over {period} days", 818 "group_label": "Statistics", 819 "sql": comparison_sql, 820 } 821 ) 822 823 return measures 824 825 def _create_rolling_average_period_sql(self, original_sql: str, period: int) -> str: 826 """ 827 Create period-over-period SQL for rolling average measures. 828 829 Rolling averages require adjusting the window size based on the selected time granularity. 830 """ 831 rows_match = re.search( 832 r"ROWS BETWEEN (\d+) PRECEDING AND CURRENT ROW", 833 original_sql, 834 ) 835 836 if not rows_match: 837 return original_sql 838 839 original_window_size = int(rows_match.group(1)) 840 time_conditions = [] 841 842 for unit, divisor in self.TIME_UNITS: 843 # calculate adjusted window size for this time granularity 844 adjusted_window = ( 845 (original_window_size + period) // divisor 846 if unit != "date" 847 else original_window_size + period 848 ) 849 850 condition = ( 851 f"{{% {'if' if unit == 'date' else 'elsif'} " 852 + f"{self.name}.submission_{unit}._is_selected %}}" 853 ) 854 855 # modify the ROWS clause to extend the window by the period 856 modified_sql = re.sub( 857 r"ROWS BETWEEN \d+ PRECEDING AND CURRENT ROW", 858 f"ROWS BETWEEN {adjusted_window} PRECEDING AND " 859 + f"{adjusted_window - original_window_size} PRECEDING", 860 original_sql, 861 ) 862 time_conditions.append(f"{condition}\n{modified_sql}") 863 864 return ( 865 "\n".join(time_conditions) 866 + f"\n{{% else %}}\n{original_sql}\n{{% endif %}}" 867 ) 868 869 def _create_custom_window_period_sql(self, original_sql: str, period: int) -> str: 870 """ 871 Create period-over-period SQL for custom rolling average measures. 872 873 Because the window size is a runtime Looker parameter, Python arithmetic 874 cannot be used. Instead, Liquid {% assign %} tags compute the shifted 875 window boundaries at query time. 876 877 For date granularity (divisor=1): 878 preceding = window_size - 1 879 start = preceding + period 880 end = period 881 → ROWS BETWEEN {{ preceding | plus: period }} PRECEDING AND period PRECEDING 882 883 For coarser granularities (divisor D): 884 adjusted = (preceding + period) / D (integer division) 885 adjusted_end = adjusted - preceding 886 → ROWS BETWEEN {{ adjusted }} PRECEDING AND {{ adjusted_end }} PRECEDING 887 """ 888 rows_pattern = r"ROWS BETWEEN\s*\{\{[^}]+\}\}\s*PRECEDING AND CURRENT ROW" 889 if not re.search(rows_pattern, original_sql, re.DOTALL): 890 return original_sql 891 892 time_conditions = [] 893 for unit, divisor in self.TIME_UNITS: 894 condition = ( 895 f"{{% {'if' if unit == 'date' else 'elsif'} " 896 + f"{self.name}.submission_{unit}._is_selected %}}" 897 ) 898 899 if unit == "date": 900 rows_replacement = ( 901 f"{{% assign preceding = rolling_average_window_size._parameter_value | minus: 1 %}}\n" 902 f"ROWS BETWEEN {{{{ preceding | plus: {period} }}}} PRECEDING AND {period} PRECEDING" 903 ) 904 else: 905 rows_replacement = ( 906 f"{{% assign preceding = rolling_average_window_size._parameter_value | minus: 1 %}}\n" 907 f"{{% assign adjusted = preceding | plus: {period} | divided_by: {divisor} %}}\n" 908 f"{{% assign adjusted_end = adjusted | minus: preceding %}}\n" 909 f"ROWS BETWEEN {{{{ adjusted }}}} PRECEDING AND {{{{ adjusted_end }}}} PRECEDING" 910 ) 911 912 modified_sql = re.sub( 913 rows_pattern, rows_replacement, original_sql, flags=re.DOTALL 914 ) 915 time_conditions.append(f"{condition}\n{modified_sql}") 916 917 return ( 918 "\n".join(time_conditions) 919 + f"\n{{% else %}}\n{original_sql}\n{{% endif %}}" 920 ) 921 922 def _create_lag_period_sql(self, matching_measure: dict, period: int) -> str: 923 """ 924 Create period-over-period SQL using LAG function for standard measures. 925 926 LAG function looks back N periods to get historical values. The period is adjusted 927 based on the selected time granularity (daily, weekly, monthly, etc.). 928 """ 929 time_conditions = [] 930 931 for unit, divisor in self.TIME_UNITS: 932 # calculate adjusted period for this time granularity 933 adjusted_period = period // divisor if unit != "date" else period 934 935 order_by = ( 936 f"${{submission_{unit}}}" if unit != "date" else "${submission_date}" 937 ) 938 939 condition = ( 940 f"{{% {'if' if unit == 'date' else 'elsif'} " 941 + f"{self.name}.submission_{unit}._is_selected %}}" 942 ) 943 944 lag_sql = f"""LAG(${{{matching_measure['name']}}}, {adjusted_period}) OVER ( 945 {{% if date_groupby_position._parameter_value != "" %}} 946 ORDER BY {{% parameter date_groupby_position %}} 947 {{% else %}} 948 ORDER BY {order_by} 949 {{% endif %}} 950 )""" 951 time_conditions.append(f"{condition}\n{lag_sql}") 952 953 return ( 954 "\n".join(time_conditions) 955 + f"\n{{% else %}}\nLAG({matching_measure['name']}, {period}) " 956 + "OVER (ORDER BY ${submission_date})\n{% endif %}" 957 )
15class MetricDefinitionsView(View): 16 """A view for metric-hub metrics that come from the same data source.""" 17 18 type: str = "metric_definitions_view" 19 20 # Time unit divisors for converting days to different granularities 21 TIME_UNITS = [ 22 ("date", 1), # Daily: no conversion needed 23 ("week", 7), # Weekly: divide by 7 24 ("month", 30), # Monthly: approximate 30 days per month 25 ("quarter", 90), # Quarterly: approximate 90 days per quarter 26 ("year", 365), # Yearly: approximate 365 days per year 27 ] 28 29 def __init__(self, namespace: str, name: str, tables: List[Dict[str, str]]): 30 """Get an instance of an MetricDefinitionsView.""" 31 super().__init__(namespace, name, MetricDefinitionsView.type, tables) 32 33 @classmethod 34 def from_db_views( 35 klass, 36 namespace: str, 37 is_glean: bool, 38 channels: List[Dict[str, str]], 39 db_views: dict, 40 ) -> Iterator[MetricDefinitionsView]: 41 """Get Metric Definition Views from db views and app variants.""" 42 return iter(()) 43 44 @classmethod 45 def from_dict( 46 klass, namespace: str, name: str, definition: ViewDict 47 ) -> MetricDefinitionsView: 48 """Get a MetricDefinitionsView from a dict representation.""" 49 return klass(namespace, name, definition.get("tables", [])) 50 51 def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]: 52 """Get this view as LookML.""" 53 namespace_definitions = MetricsConfigLoader.configs.get_platform_definitions( 54 self.namespace 55 ) 56 if namespace_definitions is None: 57 return {} 58 59 # get all metric definitions that depend on the data source represented by this view 60 data_source_name = re.sub("^metric_definitions_", "", self.name) 61 data_source_definition = MetricsConfigLoader.configs.get_data_source_definition( 62 data_source_name, self.namespace 63 ) 64 65 if data_source_definition is None: 66 return {} 67 68 # todo: hide deprecated metrics? 69 metric_definitions = [ 70 f"""{ 71 MetricsConfigLoader.configs.get_env().from_string(metric.select_expression).render() 72 } AS {metric_slug},\n""" 73 for metric_slug, metric in namespace_definitions.metrics.definitions.items() 74 if metric.select_expression 75 and metric.data_source.name == data_source_name 76 and metric.type != "histogram" 77 ] 78 79 if metric_definitions == []: 80 return {} 81 82 # Metric definitions are intended to aggregated by client per date. 83 # A derived table is needed to do these aggregations, instead of defining them as measures 84 # we want to have them available as dimensions (which don't allow aggregations in their definitions) 85 # to allow for custom measures to be later defined in Looker that aggregate these per client metrics. 86 view_defn: Dict[str, Any] = {"name": self.name} 87 88 ignore_base_fields = [ 89 "client_id", 90 "submission_date", 91 "submission", 92 "first_run", 93 ] + [ 94 metric_slug 95 for metric_slug, metric in namespace_definitions.metrics.definitions.items() 96 if metric.select_expression 97 and metric.data_source.name == data_source_name 98 and metric.type != "histogram" 99 ] 100 101 base_view_dimensions = {} 102 joined_data_sources = [] 103 104 # check if the metric data source has joins 105 # joined data sources are generally used for creating the "Base Fields" 106 if data_source_definition.joins: 107 # determine the dimensions selected by the joined data sources 108 for joined_data_source_slug, join in data_source_definition.joins.items(): 109 joined_data_source = ( 110 MetricsConfigLoader.configs.get_data_source_definition( 111 joined_data_source_slug, self.namespace 112 ) 113 ) 114 115 if joined_data_source.columns_as_dimensions: 116 joined_data_sources.append(joined_data_source) 117 118 date_filter = None 119 if joined_data_source.submission_date_column != "NULL": 120 date_filter = ( 121 None 122 if joined_data_source.submission_date_column is None 123 or joined_data_source.submission_date_column == "NULL" 124 else f"{joined_data_source.submission_date_column} = '2023-01-01'" 125 ) 126 127 # create Looker dimensions by doing a dryrun 128 query = MetricsConfigLoader.configs.get_data_source_sql( 129 joined_data_source_slug, 130 self.namespace, 131 where=date_filter, 132 ).format(dataset=self.namespace) 133 134 base_view_dimensions[joined_data_source_slug] = ( 135 lookml_utils._generate_dimensions_from_query( 136 query, dryrun=dryrun 137 ) 138 ) 139 140 if ( 141 data_source_definition.client_id_column == "NULL" 142 and not base_view_dimensions 143 ) or data_source_definition.columns_as_dimensions: 144 # if the metrics data source doesn't have any joins then use the dimensions 145 # of the data source itself as base fields 146 date_filter = None 147 if data_source_definition.submission_date_column != "NULL": 148 date_filter = ( 149 "submission_date = '2023-01-01'" 150 if data_source_definition.submission_date_column is None 151 else f"{data_source_definition.submission_date_column} = '2023-01-01'" 152 ) 153 154 query = MetricsConfigLoader.configs.get_data_source_sql( 155 data_source_definition.name, 156 self.namespace, 157 where=date_filter, 158 ignore_joins=True, 159 ).format(dataset=self.namespace) 160 161 base_view_dimensions[data_source_definition.name] = ( 162 lookml_utils._generate_dimensions_from_query(query, dryrun) 163 ) 164 165 # to prevent duplicate dimensions, especially when working with time dimensions 166 # where names are modified potentially causing naming collisions 167 seen_dimensions = set() 168 # prepare base field data for query 169 base_view_fields = [] 170 for data_source, dimensions in base_view_dimensions.items(): 171 for dimension in dimensions: 172 if ( 173 dimension["name"] not in ignore_base_fields 174 and dimension["name"] not in seen_dimensions 175 and "hidden" not in dimension 176 ): 177 sql = ( 178 f"{data_source}.{dimension['name'].replace('__', '.')} AS" 179 + f" {data_source}_{dimension['name']},\n" 180 ) 181 # date/time/timestamp suffixes are removed when generating lookml dimensions, however we 182 # need the original field name for the derived view SQL 183 if dimension["type"] == "time" and not dimension["sql"].endswith( 184 dimension["name"] 185 ): 186 suffix = dimension["sql"].split( 187 dimension["name"].replace("__", ".") 188 )[-1] 189 sql = ( 190 f"{data_source}.{(dimension['name']+suffix).replace('__', '.')} AS" 191 + f" {data_source}_{dimension['name']},\n" 192 ) 193 194 base_view_fields.append( 195 { 196 "name": f"{data_source}_{dimension['name']}", 197 "select_sql": f"{data_source}_{dimension['name']},\n", 198 "sql": sql, 199 } 200 ) 201 seen_dimensions.add(dimension["name"]) 202 203 client_id_field = ( 204 "NULL" 205 if data_source_definition.client_id_column == "NULL" 206 else f'{data_source_definition.client_id_column or "client_id"}' 207 ) 208 209 # filters for date ranges 210 where_sql = " AND ".join( 211 [ 212 f""" 213 {data_source.name}.{data_source.submission_date_column or "submission_date"} 214 {{% if analysis_period._is_filtered %}} 215 BETWEEN 216 DATE_SUB( 217 COALESCE( 218 SAFE_CAST( 219 {{% date_start analysis_period %}} AS DATE 220 ), CURRENT_DATE()), 221 INTERVAL {{% parameter lookback_days %}} DAY 222 ) AND 223 COALESCE( 224 SAFE_CAST( 225 {{% date_end analysis_period %}} AS DATE 226 ), CURRENT_DATE()) 227 {{% else %}} 228 BETWEEN 229 DATE_SUB( 230 COALESCE( 231 SAFE_CAST( 232 {{% date_start submission_date %}} AS DATE 233 ), CURRENT_DATE()), 234 INTERVAL {{% parameter lookback_days %}} DAY 235 ) AND 236 COALESCE( 237 SAFE_CAST( 238 {{% date_end submission_date %}} AS DATE 239 ), CURRENT_DATE()) 240 {{% endif %}} 241 """ 242 for data_source in [data_source_definition] + joined_data_sources 243 if data_source.submission_date_column != "NULL" 244 ] 245 ) 246 247 # filte on sample_id if such a field exists 248 for field in base_view_fields: 249 if field["name"].endswith("_sample_id"): 250 where_sql += f""" 251 AND 252 {field['name'].split('_sample_id')[0]}.sample_id < {{% parameter sampling %}} 253 """ 254 break 255 256 view_defn["derived_table"] = { 257 "sql": f""" 258 SELECT 259 {"".join(metric_definitions)} 260 {"".join([field['select_sql'] for field in base_view_fields])} 261 {client_id_field} AS client_id, 262 {{% if aggregate_metrics_by._parameter_value == 'day' %}} 263 {data_source_definition.submission_date_column or "submission_date"} AS analysis_basis 264 {{% elsif aggregate_metrics_by._parameter_value == 'week' %}} 265 (FORMAT_DATE( 266 '%F', 267 DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"}, 268 WEEK(MONDAY))) 269 ) AS analysis_basis 270 {{% elsif aggregate_metrics_by._parameter_value == 'month' %}} 271 (FORMAT_DATE( 272 '%Y-%m', 273 {data_source_definition.submission_date_column or "submission_date"}) 274 ) AS analysis_basis 275 {{% elsif aggregate_metrics_by._parameter_value == 'quarter' %}} 276 (FORMAT_DATE( 277 '%Y-%m', 278 DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"}, 279 QUARTER)) 280 ) AS analysis_basis 281 {{% elsif aggregate_metrics_by._parameter_value == 'year' %}} 282 (EXTRACT( 283 YEAR FROM {data_source_definition.submission_date_column or "submission_date"}) 284 ) AS analysis_basis 285 {{% else %}} 286 NULL as analysis_basis 287 {{% endif %}} 288 FROM 289 ( 290 SELECT 291 {data_source_name}.*, 292 {"".join([field['sql'] for field in base_view_fields])} 293 FROM 294 { 295 MetricsConfigLoader.configs.get_data_source_sql( 296 data_source_name, 297 self.namespace, 298 select_fields=False 299 ).format(dataset=self.namespace) 300 } 301 WHERE {where_sql} 302 ) 303 GROUP BY 304 {"".join([field['select_sql'] for field in base_view_fields])} 305 client_id, 306 analysis_basis 307 """ 308 } 309 310 view_defn["dimensions"] = self.get_dimensions() 311 view_defn["dimension_groups"] = self.get_dimension_groups() 312 313 # add the Looker dimensions 314 for data_source, dimensions in base_view_dimensions.items(): 315 for dimension in dimensions: 316 if dimension["name"] not in ignore_base_fields: 317 dimension["sql"] = ( 318 "${TABLE}." + f"{data_source}_{dimension['name']}" 319 ) 320 dimension["group_label"] = "Base Fields" 321 if not lookml_utils._is_dimension_group(dimension): 322 view_defn["dimensions"].append(dimension) 323 else: 324 view_defn["dimension_groups"].append(dimension) 325 # avoid duplicate dimensions 326 ignore_base_fields.append(dimension["name"]) 327 328 view_defn["measures"] = self.get_measures( 329 view_defn["dimensions"], 330 ) 331 view_defn["sets"] = self._get_sets() 332 rolling_average_window_sizes = sorted( 333 { 334 window_size 335 for metric in namespace_definitions.metrics.definitions.values() 336 if metric.select_expression 337 and metric.data_source.name == data_source_name 338 and metric.type != "histogram" 339 and metric.statistics 340 for stat_slug, stat_conf in metric.statistics.items() 341 if stat_slug == "rolling_average" 342 for window_size in stat_conf.get("window_sizes", []) 343 } 344 ) 345 view_defn["parameters"] = self._get_parameters( 346 view_defn["dimensions"], rolling_average_window_sizes 347 ) 348 view_defn["filters"] = self._get_filters() 349 350 return {"views": [view_defn]} 351 352 def get_dimensions( 353 self, 354 _table=None, 355 _v1_name: Optional[str] = None, 356 _dryrun=None, 357 ) -> List[Dict[str, Any]]: 358 """Get the set of dimensions for this view based on the metric definitions in metric-hub.""" 359 namespace_definitions = MetricsConfigLoader.configs.get_platform_definitions( 360 self.namespace 361 ) 362 metric_definitions = namespace_definitions.metrics.definitions 363 data_source_name = re.sub("^metric_definitions_", "", self.name) 364 365 return [ 366 { 367 "name": "client_id", 368 "type": "string", 369 "sql": "SAFE_CAST(${TABLE}.client_id AS STRING)", 370 "label": "Client ID", 371 "primary_key": "yes", 372 "group_label": "Base Fields", 373 "description": "Unique client identifier", 374 }, 375 ] + [ # add a dimension for each metric definition 376 { 377 "name": metric_slug, 378 "group_label": "Metrics", 379 "label": metric.friendly_name 380 or lookml_utils.slug_to_title(metric_slug), 381 "description": metric.description or "", 382 "type": "number", 383 "sql": "${TABLE}." + metric_slug, 384 } 385 for metric_slug, metric in metric_definitions.items() 386 if metric.select_expression 387 and metric.data_source.name == data_source_name 388 and metric.type != "histogram" 389 ] 390 391 def get_dimension_groups(self) -> List[Dict[str, Any]]: 392 """Get dimension groups for this view.""" 393 return [ 394 { 395 "name": "submission", 396 "type": "time", 397 "datatype": "date", 398 "group_label": "Base Fields", 399 "sql": "${TABLE}.analysis_basis", 400 "label": "Submission", 401 "timeframes": [ 402 "raw", 403 "date", 404 "week", 405 "month", 406 "quarter", 407 "year", 408 ], 409 } 410 ] 411 412 def _get_sets(self) -> List[Dict[str, Any]]: 413 """Generate metric sets.""" 414 # group all the metric dimensions into a set 415 dimensions = self.get_dimensions() 416 measures = self.get_measures(dimensions) 417 418 return [ 419 { 420 "name": "metrics", 421 "fields": [ 422 dimension["name"] 423 for dimension in dimensions 424 if dimension["name"] != "client_id" 425 ] 426 + [measure["name"] for measure in measures], 427 } 428 ] 429 430 def _get_parameters( 431 self, dimensions: List[dict], rolling_average_window_sizes: List[int] = [] 432 ): 433 hide_sampling = "yes" 434 435 for dim in dimensions: 436 if dim["name"] == "sample_id": 437 hide_sampling = "no" 438 break 439 440 return [ 441 { 442 "name": "aggregate_metrics_by", 443 "label": "Aggregate Client Metrics Per", 444 "type": "unquoted", 445 "default_value": "day", 446 "allowed_values": [ 447 {"label": "Per Day", "value": "day"}, 448 {"label": "Per Week", "value": "week"}, 449 {"label": "Per Month", "value": "month"}, 450 {"label": "Per Quarter", "value": "quarter"}, 451 {"label": "Per Year", "value": "year"}, 452 {"label": "Overall", "value": "overall"}, 453 ], 454 }, 455 { 456 "name": "sampling", 457 "label": "Sample of source data in %", 458 "type": "unquoted", 459 "default_value": "100", 460 "hidden": hide_sampling, 461 }, 462 { 463 "name": "lookback_days", 464 "label": "Lookback (Days)", 465 "type": "unquoted", 466 "description": "Number of days added before the filtered date range. " 467 + "Useful for period-over-period comparisons.", 468 "default_value": "0", 469 }, 470 { 471 "name": "date_groupby_position", 472 "label": "Date Group By Position", 473 "type": "unquoted", 474 "description": "Position of the date field in the group by clause. " 475 + "Required when submission_week, submission_month, submission_quarter, submission_year " 476 + "is selected as BigQuery can't correctly resolve the GROUP BY otherwise", 477 "default_value": "", 478 }, 479 ] + ( 480 [ 481 { 482 "name": "rolling_average_window_size", 483 "label": "Rolling Average Custom Window Size (days)", 484 "type": "unquoted", 485 "description": "Number of days for the custom rolling average window.", 486 "default_value": str(rolling_average_window_sizes[0]), 487 "allowed_values": [ 488 {"label": f"{w} days", "value": str(w)} 489 for w in rolling_average_window_sizes 490 ], 491 } 492 ] 493 if rolling_average_window_sizes 494 else [] 495 ) 496 497 def _get_filters(self): 498 return [ 499 { 500 "name": "analysis_period", 501 "type": "date", 502 "label": "Analysis Period (with Lookback)", 503 "description": "Use this filter to define the main analysis period. " 504 + "The results will include the selected date range plus any additional " 505 + "days specified by the 'Lookback days' setting.", 506 } 507 ] 508 509 def get_measures( 510 self, dimensions: List[dict] 511 ) -> List[Dict[str, Union[str, List[Dict[str, str]]]]]: 512 """Get statistics as measures.""" 513 measures = [] 514 sampling = "1" 515 516 for dim in dimensions: 517 if dim["name"] == "sample_id": 518 sampling = "100 / {% parameter sampling %}" 519 break 520 521 for dimension in dimensions: 522 metric = MetricsConfigLoader.configs.get_metric_definition( 523 dimension["name"], self.namespace 524 ) 525 if metric and metric.statistics: 526 for statistic_slug, statistic_conf in metric.statistics.items(): 527 dimension_label = dimension.get("label") or dimension.get("name") 528 if statistic_slug in [ 529 "average", 530 "max", 531 "min", 532 "median", 533 ]: 534 measures.append( 535 { 536 "name": f"{dimension['name']}_{statistic_slug}", 537 "type": statistic_slug, 538 "sql": "${TABLE}." + dimension["name"], 539 "label": f"{dimension_label} {statistic_slug.title()}", 540 "group_label": "Statistics", 541 "description": f"{statistic_slug.title()} of {dimension_label}", 542 } 543 ) 544 elif statistic_slug == "sum": 545 measures.append( 546 { 547 "name": f"{dimension['name']}_{statistic_slug}", 548 "type": "sum", 549 "sql": "${TABLE}." + dimension["name"] + "*" + sampling, 550 "label": f"{dimension_label} Sum", 551 "group_label": "Statistics", 552 "description": f"Sum of {dimension_label}", 553 } 554 ) 555 elif statistic_slug == "client_count": 556 measures.append( 557 { 558 "name": ( 559 f"{dimension['name']}_{statistic_slug}_sampled" 560 if sampling 561 else f"{dimension['name']}_{statistic_slug}" 562 ), 563 "type": "count_distinct", 564 "label": f"{dimension_label} Client Count", 565 "group_label": "Statistics", 566 "sql": "IF(${TABLE}." 567 + f"{dimension['name']} > 0, " 568 + "${TABLE}.client_id, SAFE_CAST(NULL AS STRING))", 569 "description": f"Number of clients with {dimension_label}", 570 "hidden": "yes" if sampling else "no", 571 } 572 ) 573 574 if sampling: 575 measures.append( 576 { 577 "name": f"{dimension['name']}_{statistic_slug}", 578 "type": "number", 579 "label": f"{dimension_label} Client Count", 580 "group_label": "Statistics", 581 "sql": "${" 582 + f"{dimension['name']}_{statistic_slug}_sampled" 583 + "} *" 584 + sampling, 585 "description": f"Number of clients with {dimension_label}", 586 } 587 ) 588 elif statistic_slug == "dau_proportion": 589 if "numerator" in statistic_conf: 590 [numerator, numerator_stat] = statistic_conf[ 591 "numerator" 592 ].split(".") 593 measures.append( 594 { 595 "name": "DAU_sampled" if sampling else "DAU", 596 "type": "count_distinct", 597 "label": "DAU", 598 "group_label": "Statistics", 599 "sql": "${TABLE}.client_id", 600 "hidden": "yes", 601 } 602 ) 603 604 if sampling: 605 measures.append( 606 { 607 "name": "DAU", 608 "type": "number", 609 "label": "DAU", 610 "group_label": "Statistics", 611 "sql": "${DAU_sampled} *" + sampling, 612 "hidden": "yes", 613 } 614 ) 615 616 measures.append( 617 { 618 "name": f"{dimension['name']}_{statistic_slug}", 619 "type": "number", 620 "label": f"{dimension_label} DAU Proportion", 621 "sql": "SAFE_DIVIDE(${" 622 + f"{numerator}_{numerator_stat}" 623 + "}, ${DAU})", 624 "group_label": "Statistics", 625 "description": f"Proportion of daily active users with {dimension['name']}", 626 } 627 ) 628 elif statistic_slug == "ratio": 629 if ( 630 "numerator" in statistic_conf 631 and "denominator" in statistic_conf 632 ): 633 [numerator, numerator_stat] = statistic_conf[ 634 "numerator" 635 ].split(".") 636 [denominator, denominator_stat] = statistic_conf[ 637 "denominator" 638 ].split(".") 639 640 measures.append( 641 { 642 "name": f"{dimension['name']}_{statistic_slug}", 643 "type": "number", 644 "label": f"{dimension_label} Ratio", 645 "sql": "SAFE_DIVIDE(${" 646 + f"{numerator}_{numerator_stat}" 647 + "}, ${" 648 + f"{denominator}_{denominator_stat}" 649 + "})", 650 "group_label": "Statistics", 651 "description": f"""" 652 Ratio between {statistic_conf['numerator']} and 653 {statistic_conf['denominator']}""", 654 } 655 ) 656 elif statistic_slug == "rolling_average": 657 # rolling averages are computed over existing statistics (e.g. sum, ratio) 658 aggregations = statistic_conf.get("aggregations", ["sum"]) 659 660 # Build a dynamic PARTITION BY clause for the window function. 661 # For each base field dimension (non-metric, non-client_id), emit a 662 # liquid conditional so the field is only included in the partition 663 # when it's actually in the query. The trailing constant `1` ensures 664 # the PARTITION BY clause is never empty (when no grouping dimension 665 # is selected `PARTITION BY 1` is equivalent to a global window). 666 partition_by_conditions = "".join( 667 f"{{% if {self.name}.{dim['name']}._is_selected %}}{dim['sql']}," 668 f"{{% endif %}}" 669 for dim in dimensions 670 if dim.get("group_label") == "Base Fields" 671 and dim["name"] != "client_id" 672 ) 673 partition_by_clause = ( 674 f"PARTITION BY {partition_by_conditions} 1" 675 ) 676 677 for aggregation in aggregations: 678 # find measures that match the current dimension and aggregation type 679 matching_measures = [ 680 m 681 for m in measures 682 if m["name"].startswith( 683 f"{dimension['name']}_{aggregation}" 684 ) 685 ] 686 if "window_sizes" in statistic_conf: 687 for window_size in statistic_conf["window_sizes"]: 688 for matching_measure in matching_measures: 689 # these statistics require some time dimension to be selected 690 measures.append( 691 { 692 "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_{window_size}_day", 693 "type": "number", 694 "label": f"{matching_measure['label']} {window_size} Day Rolling Average", 695 "sql": f""" 696 {{% if {self.name}.submission_date._is_selected or 697 {self.name}.submission_week._is_selected or 698 {self.name}.submission_month._is_selected or 699 {self.name}.submission_quarter._is_selected or 700 {self.name}.submission_year._is_selected %}} 701 AVG(${{{matching_measure['name']}}}) OVER ( 702 {partition_by_clause} 703 {{% if date_groupby_position._parameter_value != "" %}} 704 ORDER BY {{% parameter date_groupby_position %}} 705 {{% elsif {self.name}.submission_date._is_selected %}} 706 ORDER BY ${{TABLE}}.analysis_basis 707 {{% else %}} 708 ERROR("date_groupby_position needs to be set when using submission_week, 709 submission_month, submission_quarter, or submission_year") 710 {{% endif %}} 711 ROWS BETWEEN {window_size - 1} PRECEDING AND CURRENT ROW 712 ) 713 {{% else %}} 714 ERROR('Please select a "submission_*" field to compute the rolling average') 715 {{% endif %}} 716 """, 717 "group_label": "Statistics", 718 "description": f"{window_size} day rolling average of {dimension_label}", 719 } 720 ) 721 722 # Parametric-window measure: uses the rolling_average_window_size 723 # parameter so the user can set any window size at query time. 724 # The parameter value must be n-1 (preceding rows count) because 725 # SQL does not allow arithmetic in ROWS BETWEEN expressions. 726 for matching_measure in matching_measures: 727 measures.append( 728 { 729 "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_custom_window", 730 "type": "number", 731 "label": f"{matching_measure['label']} Custom Window Rolling Average", 732 "sql": f""" 733 {{% if {self.name}.submission_date._is_selected or 734 {self.name}.submission_week._is_selected or 735 {self.name}.submission_month._is_selected or 736 {self.name}.submission_quarter._is_selected or 737 {self.name}.submission_year._is_selected %}} 738 AVG(${{{matching_measure['name']}}}) OVER ( 739 {partition_by_clause} 740 {{% if date_groupby_position._parameter_value != "" %}} 741 ORDER BY {{% parameter date_groupby_position %}} 742 {{% elsif {self.name}.submission_date._is_selected %}} 743 ORDER BY ${{TABLE}}.analysis_basis 744 {{% else %}} 745 ERROR("date_groupby_position needs to be set when using submission_week, 746 submission_month, submission_quarter, or submission_year") 747 {{% endif %}} 748 ROWS BETWEEN 749 {{{{ rolling_average_window_size._parameter_value | minus: 1 }}}} 750 PRECEDING AND CURRENT ROW 751 ) 752 {{% else %}} 753 ERROR('Please select a "submission_*" field to compute the rolling average') 754 {{% endif %}} 755 """, 756 "group_label": "Statistics", 757 "description": f"Rolling average of {dimension_label} using a window size " 758 + "controlled by the 'Rolling Average Custom Window Size' parameter.", 759 } 760 ) 761 762 # period-over-period measures compare current values with historical values 763 if "period_over_period" in statistic_conf: 764 # find all statistics that have period-over-period configured 765 matching_measures = [ 766 m 767 for m in measures 768 if m["name"].startswith( 769 f"{dimension['name']}_{statistic_slug}" 770 ) 771 and "_period_over_period_" not in m["name"] 772 ] 773 774 # create period-over-period measures for each configured time period 775 for period in statistic_conf["period_over_period"].get( 776 "periods", [] 777 ): 778 for matching_measure in matching_measures: 779 original_sql = matching_measure["sql"] 780 781 # rolling averages need special handling to adjust window sizes 782 # based on the selected time granularity 783 if ( 784 statistic_slug == "rolling_average" 785 and "_custom_window" in matching_measure["name"] 786 ): 787 sql = self._create_custom_window_period_sql( 788 original_sql, period 789 ) 790 elif statistic_slug == "rolling_average": 791 sql = self._create_rolling_average_period_sql( 792 original_sql, period 793 ) 794 else: 795 # standard measures use LAG function with time-adjusted periods 796 sql = self._create_lag_period_sql( 797 matching_measure, period 798 ) 799 800 # generate different types of period-over-period comparisons 801 for kind in statistic_conf["period_over_period"].get( 802 "kinds", ["previous"] 803 ): 804 if kind == "difference": 805 comparison_sql = f"({original_sql}) - ({sql})" 806 elif kind == "relative_change": 807 comparison_sql = f"SAFE_DIVIDE(({original_sql}), NULLIF(({sql}), 0)) - 1" 808 else: 809 comparison_sql = sql 810 811 measures.append( 812 { 813 "name": f"{matching_measure['name']}_{period}_day_period_over_period_{kind}", 814 "type": "number", 815 "label": f"{matching_measure['label']} " 816 + f"{period} Day Period Over Period {kind.capitalize()}", 817 "description": f"Period over period {kind.capitalize()} of " 818 + f"{matching_measure['label']} over {period} days", 819 "group_label": "Statistics", 820 "sql": comparison_sql, 821 } 822 ) 823 824 return measures 825 826 def _create_rolling_average_period_sql(self, original_sql: str, period: int) -> str: 827 """ 828 Create period-over-period SQL for rolling average measures. 829 830 Rolling averages require adjusting the window size based on the selected time granularity. 831 """ 832 rows_match = re.search( 833 r"ROWS BETWEEN (\d+) PRECEDING AND CURRENT ROW", 834 original_sql, 835 ) 836 837 if not rows_match: 838 return original_sql 839 840 original_window_size = int(rows_match.group(1)) 841 time_conditions = [] 842 843 for unit, divisor in self.TIME_UNITS: 844 # calculate adjusted window size for this time granularity 845 adjusted_window = ( 846 (original_window_size + period) // divisor 847 if unit != "date" 848 else original_window_size + period 849 ) 850 851 condition = ( 852 f"{{% {'if' if unit == 'date' else 'elsif'} " 853 + f"{self.name}.submission_{unit}._is_selected %}}" 854 ) 855 856 # modify the ROWS clause to extend the window by the period 857 modified_sql = re.sub( 858 r"ROWS BETWEEN \d+ PRECEDING AND CURRENT ROW", 859 f"ROWS BETWEEN {adjusted_window} PRECEDING AND " 860 + f"{adjusted_window - original_window_size} PRECEDING", 861 original_sql, 862 ) 863 time_conditions.append(f"{condition}\n{modified_sql}") 864 865 return ( 866 "\n".join(time_conditions) 867 + f"\n{{% else %}}\n{original_sql}\n{{% endif %}}" 868 ) 869 870 def _create_custom_window_period_sql(self, original_sql: str, period: int) -> str: 871 """ 872 Create period-over-period SQL for custom rolling average measures. 873 874 Because the window size is a runtime Looker parameter, Python arithmetic 875 cannot be used. Instead, Liquid {% assign %} tags compute the shifted 876 window boundaries at query time. 877 878 For date granularity (divisor=1): 879 preceding = window_size - 1 880 start = preceding + period 881 end = period 882 → ROWS BETWEEN {{ preceding | plus: period }} PRECEDING AND period PRECEDING 883 884 For coarser granularities (divisor D): 885 adjusted = (preceding + period) / D (integer division) 886 adjusted_end = adjusted - preceding 887 → ROWS BETWEEN {{ adjusted }} PRECEDING AND {{ adjusted_end }} PRECEDING 888 """ 889 rows_pattern = r"ROWS BETWEEN\s*\{\{[^}]+\}\}\s*PRECEDING AND CURRENT ROW" 890 if not re.search(rows_pattern, original_sql, re.DOTALL): 891 return original_sql 892 893 time_conditions = [] 894 for unit, divisor in self.TIME_UNITS: 895 condition = ( 896 f"{{% {'if' if unit == 'date' else 'elsif'} " 897 + f"{self.name}.submission_{unit}._is_selected %}}" 898 ) 899 900 if unit == "date": 901 rows_replacement = ( 902 f"{{% assign preceding = rolling_average_window_size._parameter_value | minus: 1 %}}\n" 903 f"ROWS BETWEEN {{{{ preceding | plus: {period} }}}} PRECEDING AND {period} PRECEDING" 904 ) 905 else: 906 rows_replacement = ( 907 f"{{% assign preceding = rolling_average_window_size._parameter_value | minus: 1 %}}\n" 908 f"{{% assign adjusted = preceding | plus: {period} | divided_by: {divisor} %}}\n" 909 f"{{% assign adjusted_end = adjusted | minus: preceding %}}\n" 910 f"ROWS BETWEEN {{{{ adjusted }}}} PRECEDING AND {{{{ adjusted_end }}}} PRECEDING" 911 ) 912 913 modified_sql = re.sub( 914 rows_pattern, rows_replacement, original_sql, flags=re.DOTALL 915 ) 916 time_conditions.append(f"{condition}\n{modified_sql}") 917 918 return ( 919 "\n".join(time_conditions) 920 + f"\n{{% else %}}\n{original_sql}\n{{% endif %}}" 921 ) 922 923 def _create_lag_period_sql(self, matching_measure: dict, period: int) -> str: 924 """ 925 Create period-over-period SQL using LAG function for standard measures. 926 927 LAG function looks back N periods to get historical values. The period is adjusted 928 based on the selected time granularity (daily, weekly, monthly, etc.). 929 """ 930 time_conditions = [] 931 932 for unit, divisor in self.TIME_UNITS: 933 # calculate adjusted period for this time granularity 934 adjusted_period = period // divisor if unit != "date" else period 935 936 order_by = ( 937 f"${{submission_{unit}}}" if unit != "date" else "${submission_date}" 938 ) 939 940 condition = ( 941 f"{{% {'if' if unit == 'date' else 'elsif'} " 942 + f"{self.name}.submission_{unit}._is_selected %}}" 943 ) 944 945 lag_sql = f"""LAG(${{{matching_measure['name']}}}, {adjusted_period}) OVER ( 946 {{% if date_groupby_position._parameter_value != "" %}} 947 ORDER BY {{% parameter date_groupby_position %}} 948 {{% else %}} 949 ORDER BY {order_by} 950 {{% endif %}} 951 )""" 952 time_conditions.append(f"{condition}\n{lag_sql}") 953 954 return ( 955 "\n".join(time_conditions) 956 + f"\n{{% else %}}\nLAG({matching_measure['name']}, {period}) " 957 + "OVER (ORDER BY ${submission_date})\n{% endif %}" 958 )
A view for metric-hub metrics that come from the same data source.
MetricDefinitionsView(namespace: str, name: str, tables: List[Dict[str, str]])
29 def __init__(self, namespace: str, name: str, tables: List[Dict[str, str]]): 30 """Get an instance of an MetricDefinitionsView.""" 31 super().__init__(namespace, name, MetricDefinitionsView.type, tables)
Get an instance of an MetricDefinitionsView.
@classmethod
def
from_db_views( klass, namespace: str, is_glean: bool, channels: List[Dict[str, str]], db_views: dict) -> Iterator[MetricDefinitionsView]:
33 @classmethod 34 def from_db_views( 35 klass, 36 namespace: str, 37 is_glean: bool, 38 channels: List[Dict[str, str]], 39 db_views: dict, 40 ) -> Iterator[MetricDefinitionsView]: 41 """Get Metric Definition Views from db views and app variants.""" 42 return iter(())
Get Metric Definition Views from db views and app variants.
@classmethod
def
from_dict( klass, namespace: str, name: str, definition: generator.views.view.ViewDict) -> MetricDefinitionsView:
44 @classmethod 45 def from_dict( 46 klass, namespace: str, name: str, definition: ViewDict 47 ) -> MetricDefinitionsView: 48 """Get a MetricDefinitionsView from a dict representation.""" 49 return klass(namespace, name, definition.get("tables", []))
Get a MetricDefinitionsView from a dict representation.
def
to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]:
51 def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]: 52 """Get this view as LookML.""" 53 namespace_definitions = MetricsConfigLoader.configs.get_platform_definitions( 54 self.namespace 55 ) 56 if namespace_definitions is None: 57 return {} 58 59 # get all metric definitions that depend on the data source represented by this view 60 data_source_name = re.sub("^metric_definitions_", "", self.name) 61 data_source_definition = MetricsConfigLoader.configs.get_data_source_definition( 62 data_source_name, self.namespace 63 ) 64 65 if data_source_definition is None: 66 return {} 67 68 # todo: hide deprecated metrics? 69 metric_definitions = [ 70 f"""{ 71 MetricsConfigLoader.configs.get_env().from_string(metric.select_expression).render() 72 } AS {metric_slug},\n""" 73 for metric_slug, metric in namespace_definitions.metrics.definitions.items() 74 if metric.select_expression 75 and metric.data_source.name == data_source_name 76 and metric.type != "histogram" 77 ] 78 79 if metric_definitions == []: 80 return {} 81 82 # Metric definitions are intended to aggregated by client per date. 83 # A derived table is needed to do these aggregations, instead of defining them as measures 84 # we want to have them available as dimensions (which don't allow aggregations in their definitions) 85 # to allow for custom measures to be later defined in Looker that aggregate these per client metrics. 86 view_defn: Dict[str, Any] = {"name": self.name} 87 88 ignore_base_fields = [ 89 "client_id", 90 "submission_date", 91 "submission", 92 "first_run", 93 ] + [ 94 metric_slug 95 for metric_slug, metric in namespace_definitions.metrics.definitions.items() 96 if metric.select_expression 97 and metric.data_source.name == data_source_name 98 and metric.type != "histogram" 99 ] 100 101 base_view_dimensions = {} 102 joined_data_sources = [] 103 104 # check if the metric data source has joins 105 # joined data sources are generally used for creating the "Base Fields" 106 if data_source_definition.joins: 107 # determine the dimensions selected by the joined data sources 108 for joined_data_source_slug, join in data_source_definition.joins.items(): 109 joined_data_source = ( 110 MetricsConfigLoader.configs.get_data_source_definition( 111 joined_data_source_slug, self.namespace 112 ) 113 ) 114 115 if joined_data_source.columns_as_dimensions: 116 joined_data_sources.append(joined_data_source) 117 118 date_filter = None 119 if joined_data_source.submission_date_column != "NULL": 120 date_filter = ( 121 None 122 if joined_data_source.submission_date_column is None 123 or joined_data_source.submission_date_column == "NULL" 124 else f"{joined_data_source.submission_date_column} = '2023-01-01'" 125 ) 126 127 # create Looker dimensions by doing a dryrun 128 query = MetricsConfigLoader.configs.get_data_source_sql( 129 joined_data_source_slug, 130 self.namespace, 131 where=date_filter, 132 ).format(dataset=self.namespace) 133 134 base_view_dimensions[joined_data_source_slug] = ( 135 lookml_utils._generate_dimensions_from_query( 136 query, dryrun=dryrun 137 ) 138 ) 139 140 if ( 141 data_source_definition.client_id_column == "NULL" 142 and not base_view_dimensions 143 ) or data_source_definition.columns_as_dimensions: 144 # if the metrics data source doesn't have any joins then use the dimensions 145 # of the data source itself as base fields 146 date_filter = None 147 if data_source_definition.submission_date_column != "NULL": 148 date_filter = ( 149 "submission_date = '2023-01-01'" 150 if data_source_definition.submission_date_column is None 151 else f"{data_source_definition.submission_date_column} = '2023-01-01'" 152 ) 153 154 query = MetricsConfigLoader.configs.get_data_source_sql( 155 data_source_definition.name, 156 self.namespace, 157 where=date_filter, 158 ignore_joins=True, 159 ).format(dataset=self.namespace) 160 161 base_view_dimensions[data_source_definition.name] = ( 162 lookml_utils._generate_dimensions_from_query(query, dryrun) 163 ) 164 165 # to prevent duplicate dimensions, especially when working with time dimensions 166 # where names are modified potentially causing naming collisions 167 seen_dimensions = set() 168 # prepare base field data for query 169 base_view_fields = [] 170 for data_source, dimensions in base_view_dimensions.items(): 171 for dimension in dimensions: 172 if ( 173 dimension["name"] not in ignore_base_fields 174 and dimension["name"] not in seen_dimensions 175 and "hidden" not in dimension 176 ): 177 sql = ( 178 f"{data_source}.{dimension['name'].replace('__', '.')} AS" 179 + f" {data_source}_{dimension['name']},\n" 180 ) 181 # date/time/timestamp suffixes are removed when generating lookml dimensions, however we 182 # need the original field name for the derived view SQL 183 if dimension["type"] == "time" and not dimension["sql"].endswith( 184 dimension["name"] 185 ): 186 suffix = dimension["sql"].split( 187 dimension["name"].replace("__", ".") 188 )[-1] 189 sql = ( 190 f"{data_source}.{(dimension['name']+suffix).replace('__', '.')} AS" 191 + f" {data_source}_{dimension['name']},\n" 192 ) 193 194 base_view_fields.append( 195 { 196 "name": f"{data_source}_{dimension['name']}", 197 "select_sql": f"{data_source}_{dimension['name']},\n", 198 "sql": sql, 199 } 200 ) 201 seen_dimensions.add(dimension["name"]) 202 203 client_id_field = ( 204 "NULL" 205 if data_source_definition.client_id_column == "NULL" 206 else f'{data_source_definition.client_id_column or "client_id"}' 207 ) 208 209 # filters for date ranges 210 where_sql = " AND ".join( 211 [ 212 f""" 213 {data_source.name}.{data_source.submission_date_column or "submission_date"} 214 {{% if analysis_period._is_filtered %}} 215 BETWEEN 216 DATE_SUB( 217 COALESCE( 218 SAFE_CAST( 219 {{% date_start analysis_period %}} AS DATE 220 ), CURRENT_DATE()), 221 INTERVAL {{% parameter lookback_days %}} DAY 222 ) AND 223 COALESCE( 224 SAFE_CAST( 225 {{% date_end analysis_period %}} AS DATE 226 ), CURRENT_DATE()) 227 {{% else %}} 228 BETWEEN 229 DATE_SUB( 230 COALESCE( 231 SAFE_CAST( 232 {{% date_start submission_date %}} AS DATE 233 ), CURRENT_DATE()), 234 INTERVAL {{% parameter lookback_days %}} DAY 235 ) AND 236 COALESCE( 237 SAFE_CAST( 238 {{% date_end submission_date %}} AS DATE 239 ), CURRENT_DATE()) 240 {{% endif %}} 241 """ 242 for data_source in [data_source_definition] + joined_data_sources 243 if data_source.submission_date_column != "NULL" 244 ] 245 ) 246 247 # filte on sample_id if such a field exists 248 for field in base_view_fields: 249 if field["name"].endswith("_sample_id"): 250 where_sql += f""" 251 AND 252 {field['name'].split('_sample_id')[0]}.sample_id < {{% parameter sampling %}} 253 """ 254 break 255 256 view_defn["derived_table"] = { 257 "sql": f""" 258 SELECT 259 {"".join(metric_definitions)} 260 {"".join([field['select_sql'] for field in base_view_fields])} 261 {client_id_field} AS client_id, 262 {{% if aggregate_metrics_by._parameter_value == 'day' %}} 263 {data_source_definition.submission_date_column or "submission_date"} AS analysis_basis 264 {{% elsif aggregate_metrics_by._parameter_value == 'week' %}} 265 (FORMAT_DATE( 266 '%F', 267 DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"}, 268 WEEK(MONDAY))) 269 ) AS analysis_basis 270 {{% elsif aggregate_metrics_by._parameter_value == 'month' %}} 271 (FORMAT_DATE( 272 '%Y-%m', 273 {data_source_definition.submission_date_column or "submission_date"}) 274 ) AS analysis_basis 275 {{% elsif aggregate_metrics_by._parameter_value == 'quarter' %}} 276 (FORMAT_DATE( 277 '%Y-%m', 278 DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"}, 279 QUARTER)) 280 ) AS analysis_basis 281 {{% elsif aggregate_metrics_by._parameter_value == 'year' %}} 282 (EXTRACT( 283 YEAR FROM {data_source_definition.submission_date_column or "submission_date"}) 284 ) AS analysis_basis 285 {{% else %}} 286 NULL as analysis_basis 287 {{% endif %}} 288 FROM 289 ( 290 SELECT 291 {data_source_name}.*, 292 {"".join([field['sql'] for field in base_view_fields])} 293 FROM 294 { 295 MetricsConfigLoader.configs.get_data_source_sql( 296 data_source_name, 297 self.namespace, 298 select_fields=False 299 ).format(dataset=self.namespace) 300 } 301 WHERE {where_sql} 302 ) 303 GROUP BY 304 {"".join([field['select_sql'] for field in base_view_fields])} 305 client_id, 306 analysis_basis 307 """ 308 } 309 310 view_defn["dimensions"] = self.get_dimensions() 311 view_defn["dimension_groups"] = self.get_dimension_groups() 312 313 # add the Looker dimensions 314 for data_source, dimensions in base_view_dimensions.items(): 315 for dimension in dimensions: 316 if dimension["name"] not in ignore_base_fields: 317 dimension["sql"] = ( 318 "${TABLE}." + f"{data_source}_{dimension['name']}" 319 ) 320 dimension["group_label"] = "Base Fields" 321 if not lookml_utils._is_dimension_group(dimension): 322 view_defn["dimensions"].append(dimension) 323 else: 324 view_defn["dimension_groups"].append(dimension) 325 # avoid duplicate dimensions 326 ignore_base_fields.append(dimension["name"]) 327 328 view_defn["measures"] = self.get_measures( 329 view_defn["dimensions"], 330 ) 331 view_defn["sets"] = self._get_sets() 332 rolling_average_window_sizes = sorted( 333 { 334 window_size 335 for metric in namespace_definitions.metrics.definitions.values() 336 if metric.select_expression 337 and metric.data_source.name == data_source_name 338 and metric.type != "histogram" 339 and metric.statistics 340 for stat_slug, stat_conf in metric.statistics.items() 341 if stat_slug == "rolling_average" 342 for window_size in stat_conf.get("window_sizes", []) 343 } 344 ) 345 view_defn["parameters"] = self._get_parameters( 346 view_defn["dimensions"], rolling_average_window_sizes 347 ) 348 view_defn["filters"] = self._get_filters() 349 350 return {"views": [view_defn]}
Get this view as LookML.
def
get_dimensions( self, _table=None, _v1_name: Optional[str] = None, _dryrun=None) -> List[Dict[str, Any]]:
352 def get_dimensions( 353 self, 354 _table=None, 355 _v1_name: Optional[str] = None, 356 _dryrun=None, 357 ) -> List[Dict[str, Any]]: 358 """Get the set of dimensions for this view based on the metric definitions in metric-hub.""" 359 namespace_definitions = MetricsConfigLoader.configs.get_platform_definitions( 360 self.namespace 361 ) 362 metric_definitions = namespace_definitions.metrics.definitions 363 data_source_name = re.sub("^metric_definitions_", "", self.name) 364 365 return [ 366 { 367 "name": "client_id", 368 "type": "string", 369 "sql": "SAFE_CAST(${TABLE}.client_id AS STRING)", 370 "label": "Client ID", 371 "primary_key": "yes", 372 "group_label": "Base Fields", 373 "description": "Unique client identifier", 374 }, 375 ] + [ # add a dimension for each metric definition 376 { 377 "name": metric_slug, 378 "group_label": "Metrics", 379 "label": metric.friendly_name 380 or lookml_utils.slug_to_title(metric_slug), 381 "description": metric.description or "", 382 "type": "number", 383 "sql": "${TABLE}." + metric_slug, 384 } 385 for metric_slug, metric in metric_definitions.items() 386 if metric.select_expression 387 and metric.data_source.name == data_source_name 388 and metric.type != "histogram" 389 ]
Get the set of dimensions for this view based on the metric definitions in metric-hub.
def
get_dimension_groups(self) -> List[Dict[str, Any]]:
391 def get_dimension_groups(self) -> List[Dict[str, Any]]: 392 """Get dimension groups for this view.""" 393 return [ 394 { 395 "name": "submission", 396 "type": "time", 397 "datatype": "date", 398 "group_label": "Base Fields", 399 "sql": "${TABLE}.analysis_basis", 400 "label": "Submission", 401 "timeframes": [ 402 "raw", 403 "date", 404 "week", 405 "month", 406 "quarter", 407 "year", 408 ], 409 } 410 ]
Get dimension groups for this view.
def
get_measures( self, dimensions: List[dict]) -> List[Dict[str, Union[str, List[Dict[str, str]]]]]:
509 def get_measures( 510 self, dimensions: List[dict] 511 ) -> List[Dict[str, Union[str, List[Dict[str, str]]]]]: 512 """Get statistics as measures.""" 513 measures = [] 514 sampling = "1" 515 516 for dim in dimensions: 517 if dim["name"] == "sample_id": 518 sampling = "100 / {% parameter sampling %}" 519 break 520 521 for dimension in dimensions: 522 metric = MetricsConfigLoader.configs.get_metric_definition( 523 dimension["name"], self.namespace 524 ) 525 if metric and metric.statistics: 526 for statistic_slug, statistic_conf in metric.statistics.items(): 527 dimension_label = dimension.get("label") or dimension.get("name") 528 if statistic_slug in [ 529 "average", 530 "max", 531 "min", 532 "median", 533 ]: 534 measures.append( 535 { 536 "name": f"{dimension['name']}_{statistic_slug}", 537 "type": statistic_slug, 538 "sql": "${TABLE}." + dimension["name"], 539 "label": f"{dimension_label} {statistic_slug.title()}", 540 "group_label": "Statistics", 541 "description": f"{statistic_slug.title()} of {dimension_label}", 542 } 543 ) 544 elif statistic_slug == "sum": 545 measures.append( 546 { 547 "name": f"{dimension['name']}_{statistic_slug}", 548 "type": "sum", 549 "sql": "${TABLE}." + dimension["name"] + "*" + sampling, 550 "label": f"{dimension_label} Sum", 551 "group_label": "Statistics", 552 "description": f"Sum of {dimension_label}", 553 } 554 ) 555 elif statistic_slug == "client_count": 556 measures.append( 557 { 558 "name": ( 559 f"{dimension['name']}_{statistic_slug}_sampled" 560 if sampling 561 else f"{dimension['name']}_{statistic_slug}" 562 ), 563 "type": "count_distinct", 564 "label": f"{dimension_label} Client Count", 565 "group_label": "Statistics", 566 "sql": "IF(${TABLE}." 567 + f"{dimension['name']} > 0, " 568 + "${TABLE}.client_id, SAFE_CAST(NULL AS STRING))", 569 "description": f"Number of clients with {dimension_label}", 570 "hidden": "yes" if sampling else "no", 571 } 572 ) 573 574 if sampling: 575 measures.append( 576 { 577 "name": f"{dimension['name']}_{statistic_slug}", 578 "type": "number", 579 "label": f"{dimension_label} Client Count", 580 "group_label": "Statistics", 581 "sql": "${" 582 + f"{dimension['name']}_{statistic_slug}_sampled" 583 + "} *" 584 + sampling, 585 "description": f"Number of clients with {dimension_label}", 586 } 587 ) 588 elif statistic_slug == "dau_proportion": 589 if "numerator" in statistic_conf: 590 [numerator, numerator_stat] = statistic_conf[ 591 "numerator" 592 ].split(".") 593 measures.append( 594 { 595 "name": "DAU_sampled" if sampling else "DAU", 596 "type": "count_distinct", 597 "label": "DAU", 598 "group_label": "Statistics", 599 "sql": "${TABLE}.client_id", 600 "hidden": "yes", 601 } 602 ) 603 604 if sampling: 605 measures.append( 606 { 607 "name": "DAU", 608 "type": "number", 609 "label": "DAU", 610 "group_label": "Statistics", 611 "sql": "${DAU_sampled} *" + sampling, 612 "hidden": "yes", 613 } 614 ) 615 616 measures.append( 617 { 618 "name": f"{dimension['name']}_{statistic_slug}", 619 "type": "number", 620 "label": f"{dimension_label} DAU Proportion", 621 "sql": "SAFE_DIVIDE(${" 622 + f"{numerator}_{numerator_stat}" 623 + "}, ${DAU})", 624 "group_label": "Statistics", 625 "description": f"Proportion of daily active users with {dimension['name']}", 626 } 627 ) 628 elif statistic_slug == "ratio": 629 if ( 630 "numerator" in statistic_conf 631 and "denominator" in statistic_conf 632 ): 633 [numerator, numerator_stat] = statistic_conf[ 634 "numerator" 635 ].split(".") 636 [denominator, denominator_stat] = statistic_conf[ 637 "denominator" 638 ].split(".") 639 640 measures.append( 641 { 642 "name": f"{dimension['name']}_{statistic_slug}", 643 "type": "number", 644 "label": f"{dimension_label} Ratio", 645 "sql": "SAFE_DIVIDE(${" 646 + f"{numerator}_{numerator_stat}" 647 + "}, ${" 648 + f"{denominator}_{denominator_stat}" 649 + "})", 650 "group_label": "Statistics", 651 "description": f"""" 652 Ratio between {statistic_conf['numerator']} and 653 {statistic_conf['denominator']}""", 654 } 655 ) 656 elif statistic_slug == "rolling_average": 657 # rolling averages are computed over existing statistics (e.g. sum, ratio) 658 aggregations = statistic_conf.get("aggregations", ["sum"]) 659 660 # Build a dynamic PARTITION BY clause for the window function. 661 # For each base field dimension (non-metric, non-client_id), emit a 662 # liquid conditional so the field is only included in the partition 663 # when it's actually in the query. The trailing constant `1` ensures 664 # the PARTITION BY clause is never empty (when no grouping dimension 665 # is selected `PARTITION BY 1` is equivalent to a global window). 666 partition_by_conditions = "".join( 667 f"{{% if {self.name}.{dim['name']}._is_selected %}}{dim['sql']}," 668 f"{{% endif %}}" 669 for dim in dimensions 670 if dim.get("group_label") == "Base Fields" 671 and dim["name"] != "client_id" 672 ) 673 partition_by_clause = ( 674 f"PARTITION BY {partition_by_conditions} 1" 675 ) 676 677 for aggregation in aggregations: 678 # find measures that match the current dimension and aggregation type 679 matching_measures = [ 680 m 681 for m in measures 682 if m["name"].startswith( 683 f"{dimension['name']}_{aggregation}" 684 ) 685 ] 686 if "window_sizes" in statistic_conf: 687 for window_size in statistic_conf["window_sizes"]: 688 for matching_measure in matching_measures: 689 # these statistics require some time dimension to be selected 690 measures.append( 691 { 692 "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_{window_size}_day", 693 "type": "number", 694 "label": f"{matching_measure['label']} {window_size} Day Rolling Average", 695 "sql": f""" 696 {{% if {self.name}.submission_date._is_selected or 697 {self.name}.submission_week._is_selected or 698 {self.name}.submission_month._is_selected or 699 {self.name}.submission_quarter._is_selected or 700 {self.name}.submission_year._is_selected %}} 701 AVG(${{{matching_measure['name']}}}) OVER ( 702 {partition_by_clause} 703 {{% if date_groupby_position._parameter_value != "" %}} 704 ORDER BY {{% parameter date_groupby_position %}} 705 {{% elsif {self.name}.submission_date._is_selected %}} 706 ORDER BY ${{TABLE}}.analysis_basis 707 {{% else %}} 708 ERROR("date_groupby_position needs to be set when using submission_week, 709 submission_month, submission_quarter, or submission_year") 710 {{% endif %}} 711 ROWS BETWEEN {window_size - 1} PRECEDING AND CURRENT ROW 712 ) 713 {{% else %}} 714 ERROR('Please select a "submission_*" field to compute the rolling average') 715 {{% endif %}} 716 """, 717 "group_label": "Statistics", 718 "description": f"{window_size} day rolling average of {dimension_label}", 719 } 720 ) 721 722 # Parametric-window measure: uses the rolling_average_window_size 723 # parameter so the user can set any window size at query time. 724 # The parameter value must be n-1 (preceding rows count) because 725 # SQL does not allow arithmetic in ROWS BETWEEN expressions. 726 for matching_measure in matching_measures: 727 measures.append( 728 { 729 "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_custom_window", 730 "type": "number", 731 "label": f"{matching_measure['label']} Custom Window Rolling Average", 732 "sql": f""" 733 {{% if {self.name}.submission_date._is_selected or 734 {self.name}.submission_week._is_selected or 735 {self.name}.submission_month._is_selected or 736 {self.name}.submission_quarter._is_selected or 737 {self.name}.submission_year._is_selected %}} 738 AVG(${{{matching_measure['name']}}}) OVER ( 739 {partition_by_clause} 740 {{% if date_groupby_position._parameter_value != "" %}} 741 ORDER BY {{% parameter date_groupby_position %}} 742 {{% elsif {self.name}.submission_date._is_selected %}} 743 ORDER BY ${{TABLE}}.analysis_basis 744 {{% else %}} 745 ERROR("date_groupby_position needs to be set when using submission_week, 746 submission_month, submission_quarter, or submission_year") 747 {{% endif %}} 748 ROWS BETWEEN 749 {{{{ rolling_average_window_size._parameter_value | minus: 1 }}}} 750 PRECEDING AND CURRENT ROW 751 ) 752 {{% else %}} 753 ERROR('Please select a "submission_*" field to compute the rolling average') 754 {{% endif %}} 755 """, 756 "group_label": "Statistics", 757 "description": f"Rolling average of {dimension_label} using a window size " 758 + "controlled by the 'Rolling Average Custom Window Size' parameter.", 759 } 760 ) 761 762 # period-over-period measures compare current values with historical values 763 if "period_over_period" in statistic_conf: 764 # find all statistics that have period-over-period configured 765 matching_measures = [ 766 m 767 for m in measures 768 if m["name"].startswith( 769 f"{dimension['name']}_{statistic_slug}" 770 ) 771 and "_period_over_period_" not in m["name"] 772 ] 773 774 # create period-over-period measures for each configured time period 775 for period in statistic_conf["period_over_period"].get( 776 "periods", [] 777 ): 778 for matching_measure in matching_measures: 779 original_sql = matching_measure["sql"] 780 781 # rolling averages need special handling to adjust window sizes 782 # based on the selected time granularity 783 if ( 784 statistic_slug == "rolling_average" 785 and "_custom_window" in matching_measure["name"] 786 ): 787 sql = self._create_custom_window_period_sql( 788 original_sql, period 789 ) 790 elif statistic_slug == "rolling_average": 791 sql = self._create_rolling_average_period_sql( 792 original_sql, period 793 ) 794 else: 795 # standard measures use LAG function with time-adjusted periods 796 sql = self._create_lag_period_sql( 797 matching_measure, period 798 ) 799 800 # generate different types of period-over-period comparisons 801 for kind in statistic_conf["period_over_period"].get( 802 "kinds", ["previous"] 803 ): 804 if kind == "difference": 805 comparison_sql = f"({original_sql}) - ({sql})" 806 elif kind == "relative_change": 807 comparison_sql = f"SAFE_DIVIDE(({original_sql}), NULLIF(({sql}), 0)) - 1" 808 else: 809 comparison_sql = sql 810 811 measures.append( 812 { 813 "name": f"{matching_measure['name']}_{period}_day_period_over_period_{kind}", 814 "type": "number", 815 "label": f"{matching_measure['label']} " 816 + f"{period} Day Period Over Period {kind.capitalize()}", 817 "description": f"Period over period {kind.capitalize()} of " 818 + f"{matching_measure['label']} over {period} days", 819 "group_label": "Statistics", 820 "sql": comparison_sql, 821 } 822 ) 823 824 return measures
Get statistics as measures.