generator.views.metric_definitions_view
Class to describe a view with metrics from metric-hub.
1"""Class to describe a view with metrics from metric-hub.""" 2 3from __future__ import annotations 4 5import re 6from typing import Any, Dict, Iterator, List, Optional, Union 7 8from generator.metrics_utils import MetricsConfigLoader 9 10from . import lookml_utils 11from .view import View, ViewDict 12 13 14class MetricDefinitionsView(View): 15 """A view for metric-hub metrics that come from the same data source.""" 16 17 type: str = "metric_definitions_view" 18 19 # Time unit divisors for converting days to different granularities 20 TIME_UNITS = [ 21 ("date", 1), # Daily: no conversion needed 22 ("week", 7), # Weekly: divide by 7 23 ("month", 30), # Monthly: approximate 30 days per month 24 ("quarter", 90), # Quarterly: approximate 90 days per quarter 25 ("year", 365), # Yearly: approximate 365 days per year 26 ] 27 28 def __init__(self, namespace: str, name: str, tables: List[Dict[str, str]]): 29 """Get an instance of an MetricDefinitionsView.""" 30 super().__init__(namespace, name, MetricDefinitionsView.type, tables) 31 32 @classmethod 33 def from_db_views( 34 klass, 35 namespace: str, 36 is_glean: bool, 37 channels: List[Dict[str, str]], 38 db_views: dict, 39 ) -> Iterator[MetricDefinitionsView]: 40 """Get Metric Definition Views from db views and app variants.""" 41 return iter(()) 42 43 @classmethod 44 def from_dict( 45 klass, namespace: str, name: str, definition: ViewDict 46 ) -> MetricDefinitionsView: 47 """Get a MetricDefinitionsView from a dict representation.""" 48 return klass(namespace, name, definition.get("tables", [])) 49 50 def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]: 51 """Get this view as LookML.""" 52 namespace_definitions = MetricsConfigLoader.configs.get_platform_definitions( 53 self.namespace 54 ) 55 if namespace_definitions is None: 56 return {} 57 58 # get all metric definitions that depend on the data source represented by this view 59 data_source_name = re.sub("^metric_definitions_", "", self.name) 60 data_source_definition = MetricsConfigLoader.configs.get_data_source_definition( 61 data_source_name, self.namespace 62 ) 63 64 if data_source_definition is None: 65 return {} 66 67 # todo: hide deprecated metrics? 68 metric_definitions = [ 69 f"""{ 70 MetricsConfigLoader.configs.get_env().from_string(metric.select_expression).render() 71 } AS {metric_slug},\n""" 72 for metric_slug, metric in namespace_definitions.metrics.definitions.items() 73 if metric.select_expression 74 and metric.data_source.name == data_source_name 75 and metric.type != "histogram" 76 ] 77 78 if metric_definitions == []: 79 return {} 80 81 # Metric definitions are intended to aggregated by client per date. 82 # A derived table is needed to do these aggregations, instead of defining them as measures 83 # we want to have them available as dimensions (which don't allow aggregations in their definitions) 84 # to allow for custom measures to be later defined in Looker that aggregate these per client metrics. 85 view_defn: Dict[str, Any] = {"name": self.name} 86 87 ignore_base_fields = [ 88 "client_id", 89 "submission_date", 90 "submission", 91 "first_run", 92 ] + [ 93 metric_slug 94 for metric_slug, metric in namespace_definitions.metrics.definitions.items() 95 if metric.select_expression 96 and metric.data_source.name == data_source_name 97 and metric.type != "histogram" 98 ] 99 100 base_view_dimensions = {} 101 joined_data_sources = [] 102 103 # check if the metric data source has joins 104 # joined data sources are generally used for creating the "Base Fields" 105 if data_source_definition.joins: 106 # determine the dimensions selected by the joined data sources 107 for joined_data_source_slug, join in data_source_definition.joins.items(): 108 joined_data_source = ( 109 MetricsConfigLoader.configs.get_data_source_definition( 110 joined_data_source_slug, self.namespace 111 ) 112 ) 113 114 if joined_data_source.columns_as_dimensions: 115 joined_data_sources.append(joined_data_source) 116 117 date_filter = None 118 if joined_data_source.submission_date_column != "NULL": 119 date_filter = ( 120 None 121 if joined_data_source.submission_date_column is None 122 or joined_data_source.submission_date_column == "NULL" 123 else f"{joined_data_source.submission_date_column} = '2023-01-01'" 124 ) 125 126 # create Looker dimensions by doing a dryrun 127 query = MetricsConfigLoader.configs.get_data_source_sql( 128 joined_data_source_slug, 129 self.namespace, 130 where=date_filter, 131 ).format(dataset=self.namespace) 132 133 base_view_dimensions[joined_data_source_slug] = ( 134 lookml_utils._generate_dimensions_from_query( 135 query, dryrun=dryrun 136 ) 137 ) 138 139 if ( 140 data_source_definition.client_id_column == "NULL" 141 or data_source_definition.columns_as_dimensions 142 ): 143 # if the metrics data source doesn't have any joins then use the dimensions 144 # of the data source itself as base fields 145 date_filter = None 146 if data_source_definition.submission_date_column != "NULL": 147 date_filter = ( 148 "submission_date = '2023-01-01'" 149 if data_source_definition.submission_date_column is None 150 else f"{data_source_definition.submission_date_column} = '2023-01-01'" 151 ) 152 153 query = MetricsConfigLoader.configs.get_data_source_sql( 154 data_source_definition.name, 155 self.namespace, 156 where=date_filter, 157 ignore_joins=True, 158 ).format(dataset=self.namespace) 159 160 base_view_dimensions[data_source_definition.name] = ( 161 lookml_utils._generate_dimensions_from_query(query, dryrun) 162 ) 163 164 # to prevent duplicate dimensions, especially when working with time dimensions 165 # where names are modified potentially causing naming collisions 166 seen_dimensions = set() 167 # prepare base field data for query 168 base_view_fields = [] 169 for data_source, dimensions in base_view_dimensions.items(): 170 for dimension in dimensions: 171 if ( 172 dimension["name"] not in ignore_base_fields 173 and dimension["name"] not in seen_dimensions 174 and "hidden" not in dimension 175 ): 176 sql = ( 177 f"{data_source}.{dimension['name'].replace('__', '.')} AS" 178 + f" {data_source}_{dimension['name']},\n" 179 ) 180 # date/time/timestamp suffixes are removed when generating lookml dimensions, however we 181 # need the original field name for the derived view SQL 182 if dimension["type"] == "time" and not dimension["sql"].endswith( 183 dimension["name"] 184 ): 185 suffix = dimension["sql"].split( 186 dimension["name"].replace("__", ".") 187 )[-1] 188 sql = ( 189 f"{data_source}.{(dimension['name']+suffix).replace('__', '.')} AS" 190 + f" {data_source}_{dimension['name']},\n" 191 ) 192 193 base_view_fields.append( 194 { 195 "name": f"{data_source}_{dimension['name']}", 196 "select_sql": f"{data_source}_{dimension['name']},\n", 197 "sql": sql, 198 } 199 ) 200 seen_dimensions.add(dimension["name"]) 201 202 client_id_field = ( 203 "NULL" 204 if data_source_definition.client_id_column == "NULL" 205 else f'{data_source_definition.client_id_column or "client_id"}' 206 ) 207 208 # filters for date ranges 209 where_sql = " AND ".join( 210 [ 211 f""" 212 {data_source.name}.{data_source.submission_date_column or "submission_date"} 213 {{% if analysis_period._is_filtered %}} 214 BETWEEN 215 DATE_SUB( 216 COALESCE( 217 SAFE_CAST( 218 {{% date_start analysis_period %}} AS DATE 219 ), CURRENT_DATE()), 220 INTERVAL {{% parameter lookback_days %}} DAY 221 ) AND 222 COALESCE( 223 SAFE_CAST( 224 {{% date_end analysis_period %}} AS DATE 225 ), CURRENT_DATE()) 226 {{% else %}} 227 BETWEEN 228 DATE_SUB( 229 COALESCE( 230 SAFE_CAST( 231 {{% date_start submission_date %}} AS DATE 232 ), CURRENT_DATE()), 233 INTERVAL {{% parameter lookback_days %}} DAY 234 ) AND 235 COALESCE( 236 SAFE_CAST( 237 {{% date_end submission_date %}} AS DATE 238 ), CURRENT_DATE()) 239 {{% endif %}} 240 """ 241 for data_source in [data_source_definition] + joined_data_sources 242 if data_source.submission_date_column != "NULL" 243 ] 244 ) 245 246 # filte on sample_id if such a field exists 247 for field in base_view_fields: 248 if field["name"].endswith("_sample_id"): 249 where_sql += f""" 250 AND 251 {field['name'].split('_sample_id')[0]}.sample_id < {{% parameter sampling %}} 252 """ 253 break 254 255 view_defn["derived_table"] = { 256 "sql": f""" 257 SELECT 258 {"".join(metric_definitions)} 259 {"".join([field['select_sql'] for field in base_view_fields])} 260 {client_id_field} AS client_id, 261 {{% if aggregate_metrics_by._parameter_value == 'day' %}} 262 {data_source_definition.submission_date_column or "submission_date"} AS analysis_basis 263 {{% elsif aggregate_metrics_by._parameter_value == 'week' %}} 264 (FORMAT_DATE( 265 '%F', 266 DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"}, 267 WEEK(MONDAY))) 268 ) AS analysis_basis 269 {{% elsif aggregate_metrics_by._parameter_value == 'month' %}} 270 (FORMAT_DATE( 271 '%Y-%m', 272 {data_source_definition.submission_date_column or "submission_date"}) 273 ) AS analysis_basis 274 {{% elsif aggregate_metrics_by._parameter_value == 'quarter' %}} 275 (FORMAT_DATE( 276 '%Y-%m', 277 DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"}, 278 QUARTER)) 279 ) AS analysis_basis 280 {{% elsif aggregate_metrics_by._parameter_value == 'year' %}} 281 (EXTRACT( 282 YEAR FROM {data_source_definition.submission_date_column or "submission_date"}) 283 ) AS analysis_basis 284 {{% else %}} 285 NULL as analysis_basis 286 {{% endif %}} 287 FROM 288 ( 289 SELECT 290 {data_source_name}.*, 291 {"".join([field['sql'] for field in base_view_fields])} 292 FROM 293 { 294 MetricsConfigLoader.configs.get_data_source_sql( 295 data_source_name, 296 self.namespace, 297 select_fields=False 298 ).format(dataset=self.namespace) 299 } 300 WHERE {where_sql} 301 ) 302 GROUP BY 303 {"".join([field['select_sql'] for field in base_view_fields])} 304 client_id, 305 analysis_basis 306 """ 307 } 308 309 view_defn["dimensions"] = self.get_dimensions() 310 view_defn["dimension_groups"] = self.get_dimension_groups() 311 312 # add the Looker dimensions 313 for data_source, dimensions in base_view_dimensions.items(): 314 for dimension in dimensions: 315 if dimension["name"] not in ignore_base_fields: 316 dimension["sql"] = ( 317 "${TABLE}." + f"{data_source}_{dimension['name']}" 318 ) 319 dimension["group_label"] = "Base Fields" 320 if not lookml_utils._is_dimension_group(dimension): 321 view_defn["dimensions"].append(dimension) 322 else: 323 view_defn["dimension_groups"].append(dimension) 324 # avoid duplicate dimensions 325 ignore_base_fields.append(dimension["name"]) 326 327 view_defn["measures"] = self.get_measures( 328 view_defn["dimensions"], 329 ) 330 view_defn["sets"] = self._get_sets() 331 rolling_average_window_sizes = sorted( 332 { 333 window_size 334 for metric in namespace_definitions.metrics.definitions.values() 335 if metric.select_expression 336 and metric.data_source.name == data_source_name 337 and metric.type != "histogram" 338 and metric.statistics 339 for stat_slug, stat_conf in metric.statistics.items() 340 if stat_slug == "rolling_average" 341 for window_size in stat_conf.get("window_sizes", []) 342 } 343 ) 344 view_defn["parameters"] = self._get_parameters( 345 view_defn["dimensions"], rolling_average_window_sizes 346 ) 347 view_defn["filters"] = self._get_filters() 348 349 return {"views": [view_defn]} 350 351 def get_dimensions( 352 self, 353 _table=None, 354 _v1_name: Optional[str] = None, 355 _dryrun=None, 356 ) -> List[Dict[str, Any]]: 357 """Get the set of dimensions for this view based on the metric definitions in metric-hub.""" 358 namespace_definitions = MetricsConfigLoader.configs.get_platform_definitions( 359 self.namespace 360 ) 361 metric_definitions = namespace_definitions.metrics.definitions 362 data_source_name = re.sub("^metric_definitions_", "", self.name) 363 364 return [ 365 { 366 "name": "client_id", 367 "type": "string", 368 "sql": "SAFE_CAST(${TABLE}.client_id AS STRING)", 369 "label": "Client ID", 370 "primary_key": "yes", 371 "group_label": "Base Fields", 372 "description": "Unique client identifier", 373 }, 374 ] + [ # add a dimension for each metric definition 375 { 376 "name": metric_slug, 377 "group_label": "Metrics", 378 "label": metric.friendly_name 379 or lookml_utils.slug_to_title(metric_slug), 380 "description": metric.description or "", 381 "type": "number", 382 "sql": "${TABLE}." + metric_slug, 383 } 384 for metric_slug, metric in metric_definitions.items() 385 if metric.select_expression 386 and metric.data_source.name == data_source_name 387 and metric.type != "histogram" 388 ] 389 390 def get_dimension_groups(self) -> List[Dict[str, Any]]: 391 """Get dimension groups for this view.""" 392 return [ 393 { 394 "name": "submission", 395 "type": "time", 396 "datatype": "date", 397 "group_label": "Base Fields", 398 "sql": "${TABLE}.analysis_basis", 399 "label": "Submission", 400 "timeframes": [ 401 "raw", 402 "date", 403 "week", 404 "month", 405 "quarter", 406 "year", 407 ], 408 } 409 ] 410 411 def _get_sets(self) -> List[Dict[str, Any]]: 412 """Generate metric sets.""" 413 # group all the metric dimensions into a set 414 dimensions = self.get_dimensions() 415 measures = self.get_measures(dimensions) 416 417 return [ 418 { 419 "name": "metrics", 420 "fields": [ 421 dimension["name"] 422 for dimension in dimensions 423 if dimension["name"] != "client_id" 424 ] 425 + [measure["name"] for measure in measures], 426 } 427 ] 428 429 def _get_parameters( 430 self, dimensions: List[dict], rolling_average_window_sizes: List[int] = [] 431 ): 432 hide_sampling = "yes" 433 434 for dim in dimensions: 435 if dim["name"] == "sample_id": 436 hide_sampling = "no" 437 break 438 439 return [ 440 { 441 "name": "aggregate_metrics_by", 442 "label": "Aggregate Client Metrics Per", 443 "type": "unquoted", 444 "default_value": "day", 445 "allowed_values": [ 446 {"label": "Per Day", "value": "day"}, 447 {"label": "Per Week", "value": "week"}, 448 {"label": "Per Month", "value": "month"}, 449 {"label": "Per Quarter", "value": "quarter"}, 450 {"label": "Per Year", "value": "year"}, 451 {"label": "Overall", "value": "overall"}, 452 ], 453 }, 454 { 455 "name": "sampling", 456 "label": "Sample of source data in %", 457 "type": "unquoted", 458 "default_value": "100", 459 "hidden": hide_sampling, 460 }, 461 { 462 "name": "lookback_days", 463 "label": "Lookback (Days)", 464 "type": "unquoted", 465 "description": "Number of days added before the filtered date range. " 466 + "Useful for period-over-period comparisons.", 467 "default_value": "0", 468 }, 469 { 470 "name": "date_groupby_position", 471 "label": "Date Group By Position", 472 "type": "unquoted", 473 "description": "Position of the date field in the group by clause. " 474 + "Required when submission_week, submission_month, submission_quarter, submission_year " 475 + "is selected as BigQuery can't correctly resolve the GROUP BY otherwise", 476 "default_value": "", 477 }, 478 ] + ( 479 [ 480 { 481 "name": "rolling_average_window_size", 482 "label": "Rolling Average Custom Window Size (days)", 483 "type": "unquoted", 484 "description": "Number of days for the custom rolling average window.", 485 "default_value": str(rolling_average_window_sizes[0]), 486 "allowed_values": [ 487 {"label": f"{w} days", "value": str(w)} 488 for w in rolling_average_window_sizes 489 ], 490 } 491 ] 492 if rolling_average_window_sizes 493 else [] 494 ) 495 496 def _get_filters(self): 497 return [ 498 { 499 "name": "analysis_period", 500 "type": "date", 501 "label": "Analysis Period (with Lookback)", 502 "description": "Use this filter to define the main analysis period. " 503 + "The results will include the selected date range plus any additional " 504 + "days specified by the 'Lookback days' setting.", 505 } 506 ] 507 508 def get_measures( 509 self, dimensions: List[dict] 510 ) -> List[Dict[str, Union[str, List[Dict[str, str]]]]]: 511 """Get statistics as measures.""" 512 measures = [] 513 sampling = "1" 514 515 for dim in dimensions: 516 if dim["name"] == "sample_id": 517 sampling = "100 / {% parameter sampling %}" 518 break 519 520 for dimension in dimensions: 521 metric = MetricsConfigLoader.configs.get_metric_definition( 522 dimension["name"], self.namespace 523 ) 524 if metric and metric.statistics: 525 # Sort statistics so that rolling_average is processed last, 526 # since it depends on measures created by other statistics 527 # (e.g. sum, ratio) to already exist in the measures list. 528 sorted_statistics = sorted( 529 metric.statistics.items(), 530 key=lambda item: item[0] == "rolling_average", 531 ) 532 for statistic_slug, statistic_conf in sorted_statistics: 533 dimension_label = dimension.get("label") or dimension.get("name") 534 if statistic_slug in [ 535 "average", 536 "max", 537 "min", 538 "median", 539 ]: 540 measures.append( 541 { 542 "name": f"{dimension['name']}_{statistic_slug}", 543 "type": statistic_slug, 544 "sql": "${TABLE}." + dimension["name"], 545 "label": f"{dimension_label} {statistic_slug.title()}", 546 "group_label": "Statistics", 547 "description": f"{statistic_slug.title()} of {dimension_label}", 548 } 549 ) 550 elif statistic_slug == "sum": 551 measures.append( 552 { 553 "name": f"{dimension['name']}_{statistic_slug}", 554 "type": "sum", 555 "sql": "${TABLE}." + dimension["name"] + "*" + sampling, 556 "label": f"{dimension_label} Sum", 557 "group_label": "Statistics", 558 "description": f"Sum of {dimension_label}", 559 } 560 ) 561 elif statistic_slug == "client_count": 562 measures.append( 563 { 564 "name": ( 565 f"{dimension['name']}_{statistic_slug}_sampled" 566 if sampling 567 else f"{dimension['name']}_{statistic_slug}" 568 ), 569 "type": "count_distinct", 570 "label": f"{dimension_label} Client Count", 571 "group_label": "Statistics", 572 "sql": "IF(${TABLE}." 573 + f"{dimension['name']} > 0, " 574 + "${TABLE}.client_id, SAFE_CAST(NULL AS STRING))", 575 "description": f"Number of clients with {dimension_label}", 576 "hidden": "yes" if sampling else "no", 577 } 578 ) 579 580 if sampling: 581 measures.append( 582 { 583 "name": f"{dimension['name']}_{statistic_slug}", 584 "type": "number", 585 "label": f"{dimension_label} Client Count", 586 "group_label": "Statistics", 587 "sql": "${" 588 + f"{dimension['name']}_{statistic_slug}_sampled" 589 + "} *" 590 + sampling, 591 "description": f"Number of clients with {dimension_label}", 592 } 593 ) 594 elif statistic_slug == "dau_proportion": 595 if "numerator" in statistic_conf: 596 [numerator, numerator_stat] = statistic_conf[ 597 "numerator" 598 ].split(".") 599 measures.append( 600 { 601 "name": "DAU_sampled" if sampling else "DAU", 602 "type": "count_distinct", 603 "label": "DAU", 604 "group_label": "Statistics", 605 "sql": "${TABLE}.client_id", 606 "hidden": "yes", 607 } 608 ) 609 610 if sampling: 611 measures.append( 612 { 613 "name": "DAU", 614 "type": "number", 615 "label": "DAU", 616 "group_label": "Statistics", 617 "sql": "${DAU_sampled} *" + sampling, 618 "hidden": "yes", 619 } 620 ) 621 622 measures.append( 623 { 624 "name": f"{dimension['name']}_{statistic_slug}", 625 "type": "number", 626 "label": f"{dimension_label} DAU Proportion", 627 "sql": "SAFE_DIVIDE(${" 628 + f"{numerator}_{numerator_stat}" 629 + "}, ${DAU})", 630 "group_label": "Statistics", 631 "description": f"Proportion of daily active users with {dimension['name']}", 632 } 633 ) 634 elif statistic_slug == "ratio": 635 if ( 636 "numerator" in statistic_conf 637 and "denominator" in statistic_conf 638 ): 639 [numerator, numerator_stat] = statistic_conf[ 640 "numerator" 641 ].split(".") 642 [denominator, denominator_stat] = statistic_conf[ 643 "denominator" 644 ].split(".") 645 646 measures.append( 647 { 648 "name": f"{dimension['name']}_{statistic_slug}", 649 "type": "number", 650 "label": f"{dimension_label} Ratio", 651 "sql": "SAFE_DIVIDE(${" 652 + f"{numerator}_{numerator_stat}" 653 + "}, ${" 654 + f"{denominator}_{denominator_stat}" 655 + "})", 656 "group_label": "Statistics", 657 "description": f"""" 658 Ratio between {statistic_conf['numerator']} and 659 {statistic_conf['denominator']}""", 660 } 661 ) 662 elif statistic_slug == "rolling_average": 663 # rolling averages are computed over existing statistics (e.g. sum, ratio) 664 aggregations = statistic_conf.get("aggregations", ["sum"]) 665 666 # Build a dynamic PARTITION BY clause for the window function. 667 # For each base field dimension (non-metric, non-client_id), emit a 668 # liquid conditional so the field is only included in the partition 669 # when it's actually in the query. The trailing constant `1` ensures 670 # the PARTITION BY clause is never empty (when no grouping dimension 671 # is selected `PARTITION BY 1` is equivalent to a global window). 672 partition_by_conditions = "".join( 673 f"{{% if {self.name}.{dim['name']}._is_selected %}}{dim['sql']}," 674 f"{{% endif %}}" 675 for dim in dimensions 676 if dim.get("group_label") == "Base Fields" 677 and dim["name"] != "client_id" 678 ) 679 partition_by_clause = ( 680 f"PARTITION BY {partition_by_conditions} 1" 681 ) 682 683 for aggregation in aggregations: 684 # find measures that match the current dimension and aggregation type 685 matching_measures = [ 686 m 687 for m in measures 688 if m["name"].startswith( 689 f"{dimension['name']}_{aggregation}" 690 ) 691 ] 692 if "window_sizes" in statistic_conf: 693 for window_size in statistic_conf["window_sizes"]: 694 for matching_measure in matching_measures: 695 # these statistics require some time dimension to be selected 696 measures.append( 697 { 698 "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_{window_size}_day", 699 "type": "number", 700 "label": f"{matching_measure['label']} {window_size} Day Rolling Average", 701 "sql": f""" 702 {{% if {self.name}.submission_date._is_selected or 703 {self.name}.submission_week._is_selected or 704 {self.name}.submission_month._is_selected or 705 {self.name}.submission_quarter._is_selected or 706 {self.name}.submission_year._is_selected %}} 707 AVG(${{{matching_measure['name']}}}) OVER ( 708 {partition_by_clause} 709 {{% if date_groupby_position._parameter_value != "" %}} 710 ORDER BY {{% parameter date_groupby_position %}} 711 {{% elsif {self.name}.submission_date._is_selected %}} 712 ORDER BY ${{TABLE}}.analysis_basis 713 {{% else %}} 714 ERROR("date_groupby_position needs to be set when using submission_week, 715 submission_month, submission_quarter, or submission_year") 716 {{% endif %}} 717 ROWS BETWEEN {window_size - 1} PRECEDING AND CURRENT ROW 718 ) 719 {{% else %}} 720 ERROR('Please select a "submission_*" field to compute the rolling average') 721 {{% endif %}} 722 """, 723 "group_label": "Statistics", 724 "description": f"{window_size} day rolling average of {dimension_label}", 725 } 726 ) 727 728 # Parametric-window measure: uses the rolling_average_window_size 729 # parameter so the user can set any window size at query time. 730 # The parameter value must be n-1 (preceding rows count) because 731 # SQL does not allow arithmetic in ROWS BETWEEN expressions. 732 for matching_measure in matching_measures: 733 measures.append( 734 { 735 "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_custom_window", 736 "type": "number", 737 "label": f"{matching_measure['label']} Custom Window Rolling Average", 738 "sql": f""" 739 {{% if {self.name}.submission_date._is_selected or 740 {self.name}.submission_week._is_selected or 741 {self.name}.submission_month._is_selected or 742 {self.name}.submission_quarter._is_selected or 743 {self.name}.submission_year._is_selected %}} 744 AVG(${{{matching_measure['name']}}}) OVER ( 745 {partition_by_clause} 746 {{% if date_groupby_position._parameter_value != "" %}} 747 ORDER BY {{% parameter date_groupby_position %}} 748 {{% elsif {self.name}.submission_date._is_selected %}} 749 ORDER BY ${{TABLE}}.analysis_basis 750 {{% else %}} 751 ERROR("date_groupby_position needs to be set when using submission_week, 752 submission_month, submission_quarter, or submission_year") 753 {{% endif %}} 754 ROWS BETWEEN 755 {{{{ rolling_average_window_size._parameter_value | minus: 1 }}}} 756 PRECEDING AND CURRENT ROW 757 ) 758 {{% else %}} 759 ERROR('Please select a "submission_*" field to compute the rolling average') 760 {{% endif %}} 761 """, 762 "group_label": "Statistics", 763 "description": f"Rolling average of {dimension_label} using a window size " 764 + "controlled by the 'Rolling Average Custom Window Size' parameter.", 765 } 766 ) 767 768 # period-over-period measures compare current values with historical values 769 if "period_over_period" in statistic_conf: 770 # find all statistics that have period-over-period configured 771 matching_measures = [ 772 m 773 for m in measures 774 if m["name"].startswith( 775 f"{dimension['name']}_{statistic_slug}" 776 ) 777 and "_period_over_period_" not in m["name"] 778 ] 779 780 # create period-over-period measures for each configured time period 781 for period in statistic_conf["period_over_period"].get( 782 "periods", [] 783 ): 784 for matching_measure in matching_measures: 785 original_sql = matching_measure["sql"] 786 787 # rolling averages need special handling to adjust window sizes 788 # based on the selected time granularity 789 if ( 790 statistic_slug == "rolling_average" 791 and "_custom_window" in matching_measure["name"] 792 ): 793 sql = self._create_custom_window_period_sql( 794 original_sql, period 795 ) 796 elif statistic_slug == "rolling_average": 797 sql = self._create_rolling_average_period_sql( 798 original_sql, period 799 ) 800 else: 801 # standard measures use LAG function with time-adjusted periods 802 sql = self._create_lag_period_sql( 803 matching_measure, period 804 ) 805 806 # generate different types of period-over-period comparisons 807 for kind in statistic_conf["period_over_period"].get( 808 "kinds", ["previous"] 809 ): 810 if kind == "difference": 811 comparison_sql = f"({original_sql}) - ({sql})" 812 elif kind == "relative_change": 813 comparison_sql = f"SAFE_DIVIDE(({original_sql}), NULLIF(({sql}), 0)) - 1" 814 else: 815 comparison_sql = sql 816 817 measures.append( 818 { 819 "name": f"{matching_measure['name']}_{period}_day_period_over_period_{kind}", 820 "type": "number", 821 "label": f"{matching_measure['label']} " 822 + f"{period} Day Period Over Period {kind.capitalize()}", 823 "description": f"Period over period {kind.capitalize()} of " 824 + f"{matching_measure['label']} over {period} days", 825 "group_label": "Statistics", 826 "sql": comparison_sql, 827 } 828 ) 829 830 return measures 831 832 def _create_rolling_average_period_sql(self, original_sql: str, period: int) -> str: 833 """ 834 Create period-over-period SQL for rolling average measures. 835 836 Rolling averages require adjusting the window size based on the selected time granularity. 837 """ 838 rows_match = re.search( 839 r"ROWS BETWEEN (\d+) PRECEDING AND CURRENT ROW", 840 original_sql, 841 ) 842 843 if not rows_match: 844 return original_sql 845 846 original_window_size = int(rows_match.group(1)) 847 time_conditions = [] 848 849 for unit, divisor in self.TIME_UNITS: 850 # calculate adjusted window size for this time granularity 851 adjusted_window = ( 852 (original_window_size + period) // divisor 853 if unit != "date" 854 else original_window_size + period 855 ) 856 857 condition = ( 858 f"{{% {'if' if unit == 'date' else 'elsif'} " 859 + f"{self.name}.submission_{unit}._is_selected %}}" 860 ) 861 862 # modify the ROWS clause to extend the window by the period 863 modified_sql = re.sub( 864 r"ROWS BETWEEN \d+ PRECEDING AND CURRENT ROW", 865 f"ROWS BETWEEN {adjusted_window} PRECEDING AND " 866 + f"{adjusted_window - original_window_size} PRECEDING", 867 original_sql, 868 ) 869 time_conditions.append(f"{condition}\n{modified_sql}") 870 871 return ( 872 "\n".join(time_conditions) 873 + f"\n{{% else %}}\n{original_sql}\n{{% endif %}}" 874 ) 875 876 def _create_custom_window_period_sql(self, original_sql: str, period: int) -> str: 877 """ 878 Create period-over-period SQL for custom rolling average measures. 879 880 Because the window size is a runtime Looker parameter, Python arithmetic 881 cannot be used. Instead, Liquid {% assign %} tags compute the shifted 882 window boundaries at query time. 883 884 For date granularity (divisor=1): 885 preceding = window_size - 1 886 start = preceding + period 887 end = period 888 → ROWS BETWEEN {{ preceding | plus: period }} PRECEDING AND period PRECEDING 889 890 For coarser granularities (divisor D): 891 adjusted = (preceding + period) / D (integer division) 892 adjusted_end = adjusted - preceding 893 → ROWS BETWEEN {{ adjusted }} PRECEDING AND {{ adjusted_end }} PRECEDING 894 """ 895 rows_pattern = r"ROWS BETWEEN\s*\{\{[^}]+\}\}\s*PRECEDING AND CURRENT ROW" 896 if not re.search(rows_pattern, original_sql, re.DOTALL): 897 return original_sql 898 899 time_conditions = [] 900 for unit, divisor in self.TIME_UNITS: 901 condition = ( 902 f"{{% {'if' if unit == 'date' else 'elsif'} " 903 + f"{self.name}.submission_{unit}._is_selected %}}" 904 ) 905 906 if unit == "date": 907 rows_replacement = ( 908 f"{{% assign preceding = rolling_average_window_size._parameter_value | minus: 1 %}}\n" 909 f"ROWS BETWEEN {{{{ preceding | plus: {period} }}}} PRECEDING AND {period} PRECEDING" 910 ) 911 else: 912 rows_replacement = ( 913 f"{{% assign preceding = rolling_average_window_size._parameter_value | minus: 1 %}}\n" 914 f"{{% assign adjusted = preceding | plus: {period} | divided_by: {divisor} %}}\n" 915 f"{{% assign adjusted_end = adjusted | minus: preceding %}}\n" 916 f"ROWS BETWEEN {{{{ adjusted }}}} PRECEDING AND {{{{ adjusted_end }}}} PRECEDING" 917 ) 918 919 modified_sql = re.sub( 920 rows_pattern, rows_replacement, original_sql, flags=re.DOTALL 921 ) 922 time_conditions.append(f"{condition}\n{modified_sql}") 923 924 return ( 925 "\n".join(time_conditions) 926 + f"\n{{% else %}}\n{original_sql}\n{{% endif %}}" 927 ) 928 929 def _create_lag_period_sql(self, matching_measure: dict, period: int) -> str: 930 """ 931 Create period-over-period SQL using LAG function for standard measures. 932 933 LAG function looks back N periods to get historical values. The period is adjusted 934 based on the selected time granularity (daily, weekly, monthly, etc.). 935 """ 936 time_conditions = [] 937 938 for unit, divisor in self.TIME_UNITS: 939 # calculate adjusted period for this time granularity 940 adjusted_period = period // divisor if unit != "date" else period 941 942 order_by = ( 943 f"${{submission_{unit}}}" if unit != "date" else "${submission_date}" 944 ) 945 946 condition = ( 947 f"{{% {'if' if unit == 'date' else 'elsif'} " 948 + f"{self.name}.submission_{unit}._is_selected %}}" 949 ) 950 951 lag_sql = f"""LAG(${{{matching_measure['name']}}}, {adjusted_period}) OVER ( 952 {{% if date_groupby_position._parameter_value != "" %}} 953 ORDER BY {{% parameter date_groupby_position %}} 954 {{% else %}} 955 ORDER BY {order_by} 956 {{% endif %}} 957 )""" 958 time_conditions.append(f"{condition}\n{lag_sql}") 959 960 return ( 961 "\n".join(time_conditions) 962 + f"\n{{% else %}}\nLAG({matching_measure['name']}, {period}) " 963 + "OVER (ORDER BY ${submission_date})\n{% endif %}" 964 )
15class MetricDefinitionsView(View): 16 """A view for metric-hub metrics that come from the same data source.""" 17 18 type: str = "metric_definitions_view" 19 20 # Time unit divisors for converting days to different granularities 21 TIME_UNITS = [ 22 ("date", 1), # Daily: no conversion needed 23 ("week", 7), # Weekly: divide by 7 24 ("month", 30), # Monthly: approximate 30 days per month 25 ("quarter", 90), # Quarterly: approximate 90 days per quarter 26 ("year", 365), # Yearly: approximate 365 days per year 27 ] 28 29 def __init__(self, namespace: str, name: str, tables: List[Dict[str, str]]): 30 """Get an instance of an MetricDefinitionsView.""" 31 super().__init__(namespace, name, MetricDefinitionsView.type, tables) 32 33 @classmethod 34 def from_db_views( 35 klass, 36 namespace: str, 37 is_glean: bool, 38 channels: List[Dict[str, str]], 39 db_views: dict, 40 ) -> Iterator[MetricDefinitionsView]: 41 """Get Metric Definition Views from db views and app variants.""" 42 return iter(()) 43 44 @classmethod 45 def from_dict( 46 klass, namespace: str, name: str, definition: ViewDict 47 ) -> MetricDefinitionsView: 48 """Get a MetricDefinitionsView from a dict representation.""" 49 return klass(namespace, name, definition.get("tables", [])) 50 51 def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]: 52 """Get this view as LookML.""" 53 namespace_definitions = MetricsConfigLoader.configs.get_platform_definitions( 54 self.namespace 55 ) 56 if namespace_definitions is None: 57 return {} 58 59 # get all metric definitions that depend on the data source represented by this view 60 data_source_name = re.sub("^metric_definitions_", "", self.name) 61 data_source_definition = MetricsConfigLoader.configs.get_data_source_definition( 62 data_source_name, self.namespace 63 ) 64 65 if data_source_definition is None: 66 return {} 67 68 # todo: hide deprecated metrics? 69 metric_definitions = [ 70 f"""{ 71 MetricsConfigLoader.configs.get_env().from_string(metric.select_expression).render() 72 } AS {metric_slug},\n""" 73 for metric_slug, metric in namespace_definitions.metrics.definitions.items() 74 if metric.select_expression 75 and metric.data_source.name == data_source_name 76 and metric.type != "histogram" 77 ] 78 79 if metric_definitions == []: 80 return {} 81 82 # Metric definitions are intended to aggregated by client per date. 83 # A derived table is needed to do these aggregations, instead of defining them as measures 84 # we want to have them available as dimensions (which don't allow aggregations in their definitions) 85 # to allow for custom measures to be later defined in Looker that aggregate these per client metrics. 86 view_defn: Dict[str, Any] = {"name": self.name} 87 88 ignore_base_fields = [ 89 "client_id", 90 "submission_date", 91 "submission", 92 "first_run", 93 ] + [ 94 metric_slug 95 for metric_slug, metric in namespace_definitions.metrics.definitions.items() 96 if metric.select_expression 97 and metric.data_source.name == data_source_name 98 and metric.type != "histogram" 99 ] 100 101 base_view_dimensions = {} 102 joined_data_sources = [] 103 104 # check if the metric data source has joins 105 # joined data sources are generally used for creating the "Base Fields" 106 if data_source_definition.joins: 107 # determine the dimensions selected by the joined data sources 108 for joined_data_source_slug, join in data_source_definition.joins.items(): 109 joined_data_source = ( 110 MetricsConfigLoader.configs.get_data_source_definition( 111 joined_data_source_slug, self.namespace 112 ) 113 ) 114 115 if joined_data_source.columns_as_dimensions: 116 joined_data_sources.append(joined_data_source) 117 118 date_filter = None 119 if joined_data_source.submission_date_column != "NULL": 120 date_filter = ( 121 None 122 if joined_data_source.submission_date_column is None 123 or joined_data_source.submission_date_column == "NULL" 124 else f"{joined_data_source.submission_date_column} = '2023-01-01'" 125 ) 126 127 # create Looker dimensions by doing a dryrun 128 query = MetricsConfigLoader.configs.get_data_source_sql( 129 joined_data_source_slug, 130 self.namespace, 131 where=date_filter, 132 ).format(dataset=self.namespace) 133 134 base_view_dimensions[joined_data_source_slug] = ( 135 lookml_utils._generate_dimensions_from_query( 136 query, dryrun=dryrun 137 ) 138 ) 139 140 if ( 141 data_source_definition.client_id_column == "NULL" 142 or data_source_definition.columns_as_dimensions 143 ): 144 # if the metrics data source doesn't have any joins then use the dimensions 145 # of the data source itself as base fields 146 date_filter = None 147 if data_source_definition.submission_date_column != "NULL": 148 date_filter = ( 149 "submission_date = '2023-01-01'" 150 if data_source_definition.submission_date_column is None 151 else f"{data_source_definition.submission_date_column} = '2023-01-01'" 152 ) 153 154 query = MetricsConfigLoader.configs.get_data_source_sql( 155 data_source_definition.name, 156 self.namespace, 157 where=date_filter, 158 ignore_joins=True, 159 ).format(dataset=self.namespace) 160 161 base_view_dimensions[data_source_definition.name] = ( 162 lookml_utils._generate_dimensions_from_query(query, dryrun) 163 ) 164 165 # to prevent duplicate dimensions, especially when working with time dimensions 166 # where names are modified potentially causing naming collisions 167 seen_dimensions = set() 168 # prepare base field data for query 169 base_view_fields = [] 170 for data_source, dimensions in base_view_dimensions.items(): 171 for dimension in dimensions: 172 if ( 173 dimension["name"] not in ignore_base_fields 174 and dimension["name"] not in seen_dimensions 175 and "hidden" not in dimension 176 ): 177 sql = ( 178 f"{data_source}.{dimension['name'].replace('__', '.')} AS" 179 + f" {data_source}_{dimension['name']},\n" 180 ) 181 # date/time/timestamp suffixes are removed when generating lookml dimensions, however we 182 # need the original field name for the derived view SQL 183 if dimension["type"] == "time" and not dimension["sql"].endswith( 184 dimension["name"] 185 ): 186 suffix = dimension["sql"].split( 187 dimension["name"].replace("__", ".") 188 )[-1] 189 sql = ( 190 f"{data_source}.{(dimension['name']+suffix).replace('__', '.')} AS" 191 + f" {data_source}_{dimension['name']},\n" 192 ) 193 194 base_view_fields.append( 195 { 196 "name": f"{data_source}_{dimension['name']}", 197 "select_sql": f"{data_source}_{dimension['name']},\n", 198 "sql": sql, 199 } 200 ) 201 seen_dimensions.add(dimension["name"]) 202 203 client_id_field = ( 204 "NULL" 205 if data_source_definition.client_id_column == "NULL" 206 else f'{data_source_definition.client_id_column or "client_id"}' 207 ) 208 209 # filters for date ranges 210 where_sql = " AND ".join( 211 [ 212 f""" 213 {data_source.name}.{data_source.submission_date_column or "submission_date"} 214 {{% if analysis_period._is_filtered %}} 215 BETWEEN 216 DATE_SUB( 217 COALESCE( 218 SAFE_CAST( 219 {{% date_start analysis_period %}} AS DATE 220 ), CURRENT_DATE()), 221 INTERVAL {{% parameter lookback_days %}} DAY 222 ) AND 223 COALESCE( 224 SAFE_CAST( 225 {{% date_end analysis_period %}} AS DATE 226 ), CURRENT_DATE()) 227 {{% else %}} 228 BETWEEN 229 DATE_SUB( 230 COALESCE( 231 SAFE_CAST( 232 {{% date_start submission_date %}} AS DATE 233 ), CURRENT_DATE()), 234 INTERVAL {{% parameter lookback_days %}} DAY 235 ) AND 236 COALESCE( 237 SAFE_CAST( 238 {{% date_end submission_date %}} AS DATE 239 ), CURRENT_DATE()) 240 {{% endif %}} 241 """ 242 for data_source in [data_source_definition] + joined_data_sources 243 if data_source.submission_date_column != "NULL" 244 ] 245 ) 246 247 # filte on sample_id if such a field exists 248 for field in base_view_fields: 249 if field["name"].endswith("_sample_id"): 250 where_sql += f""" 251 AND 252 {field['name'].split('_sample_id')[0]}.sample_id < {{% parameter sampling %}} 253 """ 254 break 255 256 view_defn["derived_table"] = { 257 "sql": f""" 258 SELECT 259 {"".join(metric_definitions)} 260 {"".join([field['select_sql'] for field in base_view_fields])} 261 {client_id_field} AS client_id, 262 {{% if aggregate_metrics_by._parameter_value == 'day' %}} 263 {data_source_definition.submission_date_column or "submission_date"} AS analysis_basis 264 {{% elsif aggregate_metrics_by._parameter_value == 'week' %}} 265 (FORMAT_DATE( 266 '%F', 267 DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"}, 268 WEEK(MONDAY))) 269 ) AS analysis_basis 270 {{% elsif aggregate_metrics_by._parameter_value == 'month' %}} 271 (FORMAT_DATE( 272 '%Y-%m', 273 {data_source_definition.submission_date_column or "submission_date"}) 274 ) AS analysis_basis 275 {{% elsif aggregate_metrics_by._parameter_value == 'quarter' %}} 276 (FORMAT_DATE( 277 '%Y-%m', 278 DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"}, 279 QUARTER)) 280 ) AS analysis_basis 281 {{% elsif aggregate_metrics_by._parameter_value == 'year' %}} 282 (EXTRACT( 283 YEAR FROM {data_source_definition.submission_date_column or "submission_date"}) 284 ) AS analysis_basis 285 {{% else %}} 286 NULL as analysis_basis 287 {{% endif %}} 288 FROM 289 ( 290 SELECT 291 {data_source_name}.*, 292 {"".join([field['sql'] for field in base_view_fields])} 293 FROM 294 { 295 MetricsConfigLoader.configs.get_data_source_sql( 296 data_source_name, 297 self.namespace, 298 select_fields=False 299 ).format(dataset=self.namespace) 300 } 301 WHERE {where_sql} 302 ) 303 GROUP BY 304 {"".join([field['select_sql'] for field in base_view_fields])} 305 client_id, 306 analysis_basis 307 """ 308 } 309 310 view_defn["dimensions"] = self.get_dimensions() 311 view_defn["dimension_groups"] = self.get_dimension_groups() 312 313 # add the Looker dimensions 314 for data_source, dimensions in base_view_dimensions.items(): 315 for dimension in dimensions: 316 if dimension["name"] not in ignore_base_fields: 317 dimension["sql"] = ( 318 "${TABLE}." + f"{data_source}_{dimension['name']}" 319 ) 320 dimension["group_label"] = "Base Fields" 321 if not lookml_utils._is_dimension_group(dimension): 322 view_defn["dimensions"].append(dimension) 323 else: 324 view_defn["dimension_groups"].append(dimension) 325 # avoid duplicate dimensions 326 ignore_base_fields.append(dimension["name"]) 327 328 view_defn["measures"] = self.get_measures( 329 view_defn["dimensions"], 330 ) 331 view_defn["sets"] = self._get_sets() 332 rolling_average_window_sizes = sorted( 333 { 334 window_size 335 for metric in namespace_definitions.metrics.definitions.values() 336 if metric.select_expression 337 and metric.data_source.name == data_source_name 338 and metric.type != "histogram" 339 and metric.statistics 340 for stat_slug, stat_conf in metric.statistics.items() 341 if stat_slug == "rolling_average" 342 for window_size in stat_conf.get("window_sizes", []) 343 } 344 ) 345 view_defn["parameters"] = self._get_parameters( 346 view_defn["dimensions"], rolling_average_window_sizes 347 ) 348 view_defn["filters"] = self._get_filters() 349 350 return {"views": [view_defn]} 351 352 def get_dimensions( 353 self, 354 _table=None, 355 _v1_name: Optional[str] = None, 356 _dryrun=None, 357 ) -> List[Dict[str, Any]]: 358 """Get the set of dimensions for this view based on the metric definitions in metric-hub.""" 359 namespace_definitions = MetricsConfigLoader.configs.get_platform_definitions( 360 self.namespace 361 ) 362 metric_definitions = namespace_definitions.metrics.definitions 363 data_source_name = re.sub("^metric_definitions_", "", self.name) 364 365 return [ 366 { 367 "name": "client_id", 368 "type": "string", 369 "sql": "SAFE_CAST(${TABLE}.client_id AS STRING)", 370 "label": "Client ID", 371 "primary_key": "yes", 372 "group_label": "Base Fields", 373 "description": "Unique client identifier", 374 }, 375 ] + [ # add a dimension for each metric definition 376 { 377 "name": metric_slug, 378 "group_label": "Metrics", 379 "label": metric.friendly_name 380 or lookml_utils.slug_to_title(metric_slug), 381 "description": metric.description or "", 382 "type": "number", 383 "sql": "${TABLE}." + metric_slug, 384 } 385 for metric_slug, metric in metric_definitions.items() 386 if metric.select_expression 387 and metric.data_source.name == data_source_name 388 and metric.type != "histogram" 389 ] 390 391 def get_dimension_groups(self) -> List[Dict[str, Any]]: 392 """Get dimension groups for this view.""" 393 return [ 394 { 395 "name": "submission", 396 "type": "time", 397 "datatype": "date", 398 "group_label": "Base Fields", 399 "sql": "${TABLE}.analysis_basis", 400 "label": "Submission", 401 "timeframes": [ 402 "raw", 403 "date", 404 "week", 405 "month", 406 "quarter", 407 "year", 408 ], 409 } 410 ] 411 412 def _get_sets(self) -> List[Dict[str, Any]]: 413 """Generate metric sets.""" 414 # group all the metric dimensions into a set 415 dimensions = self.get_dimensions() 416 measures = self.get_measures(dimensions) 417 418 return [ 419 { 420 "name": "metrics", 421 "fields": [ 422 dimension["name"] 423 for dimension in dimensions 424 if dimension["name"] != "client_id" 425 ] 426 + [measure["name"] for measure in measures], 427 } 428 ] 429 430 def _get_parameters( 431 self, dimensions: List[dict], rolling_average_window_sizes: List[int] = [] 432 ): 433 hide_sampling = "yes" 434 435 for dim in dimensions: 436 if dim["name"] == "sample_id": 437 hide_sampling = "no" 438 break 439 440 return [ 441 { 442 "name": "aggregate_metrics_by", 443 "label": "Aggregate Client Metrics Per", 444 "type": "unquoted", 445 "default_value": "day", 446 "allowed_values": [ 447 {"label": "Per Day", "value": "day"}, 448 {"label": "Per Week", "value": "week"}, 449 {"label": "Per Month", "value": "month"}, 450 {"label": "Per Quarter", "value": "quarter"}, 451 {"label": "Per Year", "value": "year"}, 452 {"label": "Overall", "value": "overall"}, 453 ], 454 }, 455 { 456 "name": "sampling", 457 "label": "Sample of source data in %", 458 "type": "unquoted", 459 "default_value": "100", 460 "hidden": hide_sampling, 461 }, 462 { 463 "name": "lookback_days", 464 "label": "Lookback (Days)", 465 "type": "unquoted", 466 "description": "Number of days added before the filtered date range. " 467 + "Useful for period-over-period comparisons.", 468 "default_value": "0", 469 }, 470 { 471 "name": "date_groupby_position", 472 "label": "Date Group By Position", 473 "type": "unquoted", 474 "description": "Position of the date field in the group by clause. " 475 + "Required when submission_week, submission_month, submission_quarter, submission_year " 476 + "is selected as BigQuery can't correctly resolve the GROUP BY otherwise", 477 "default_value": "", 478 }, 479 ] + ( 480 [ 481 { 482 "name": "rolling_average_window_size", 483 "label": "Rolling Average Custom Window Size (days)", 484 "type": "unquoted", 485 "description": "Number of days for the custom rolling average window.", 486 "default_value": str(rolling_average_window_sizes[0]), 487 "allowed_values": [ 488 {"label": f"{w} days", "value": str(w)} 489 for w in rolling_average_window_sizes 490 ], 491 } 492 ] 493 if rolling_average_window_sizes 494 else [] 495 ) 496 497 def _get_filters(self): 498 return [ 499 { 500 "name": "analysis_period", 501 "type": "date", 502 "label": "Analysis Period (with Lookback)", 503 "description": "Use this filter to define the main analysis period. " 504 + "The results will include the selected date range plus any additional " 505 + "days specified by the 'Lookback days' setting.", 506 } 507 ] 508 509 def get_measures( 510 self, dimensions: List[dict] 511 ) -> List[Dict[str, Union[str, List[Dict[str, str]]]]]: 512 """Get statistics as measures.""" 513 measures = [] 514 sampling = "1" 515 516 for dim in dimensions: 517 if dim["name"] == "sample_id": 518 sampling = "100 / {% parameter sampling %}" 519 break 520 521 for dimension in dimensions: 522 metric = MetricsConfigLoader.configs.get_metric_definition( 523 dimension["name"], self.namespace 524 ) 525 if metric and metric.statistics: 526 # Sort statistics so that rolling_average is processed last, 527 # since it depends on measures created by other statistics 528 # (e.g. sum, ratio) to already exist in the measures list. 529 sorted_statistics = sorted( 530 metric.statistics.items(), 531 key=lambda item: item[0] == "rolling_average", 532 ) 533 for statistic_slug, statistic_conf in sorted_statistics: 534 dimension_label = dimension.get("label") or dimension.get("name") 535 if statistic_slug in [ 536 "average", 537 "max", 538 "min", 539 "median", 540 ]: 541 measures.append( 542 { 543 "name": f"{dimension['name']}_{statistic_slug}", 544 "type": statistic_slug, 545 "sql": "${TABLE}." + dimension["name"], 546 "label": f"{dimension_label} {statistic_slug.title()}", 547 "group_label": "Statistics", 548 "description": f"{statistic_slug.title()} of {dimension_label}", 549 } 550 ) 551 elif statistic_slug == "sum": 552 measures.append( 553 { 554 "name": f"{dimension['name']}_{statistic_slug}", 555 "type": "sum", 556 "sql": "${TABLE}." + dimension["name"] + "*" + sampling, 557 "label": f"{dimension_label} Sum", 558 "group_label": "Statistics", 559 "description": f"Sum of {dimension_label}", 560 } 561 ) 562 elif statistic_slug == "client_count": 563 measures.append( 564 { 565 "name": ( 566 f"{dimension['name']}_{statistic_slug}_sampled" 567 if sampling 568 else f"{dimension['name']}_{statistic_slug}" 569 ), 570 "type": "count_distinct", 571 "label": f"{dimension_label} Client Count", 572 "group_label": "Statistics", 573 "sql": "IF(${TABLE}." 574 + f"{dimension['name']} > 0, " 575 + "${TABLE}.client_id, SAFE_CAST(NULL AS STRING))", 576 "description": f"Number of clients with {dimension_label}", 577 "hidden": "yes" if sampling else "no", 578 } 579 ) 580 581 if sampling: 582 measures.append( 583 { 584 "name": f"{dimension['name']}_{statistic_slug}", 585 "type": "number", 586 "label": f"{dimension_label} Client Count", 587 "group_label": "Statistics", 588 "sql": "${" 589 + f"{dimension['name']}_{statistic_slug}_sampled" 590 + "} *" 591 + sampling, 592 "description": f"Number of clients with {dimension_label}", 593 } 594 ) 595 elif statistic_slug == "dau_proportion": 596 if "numerator" in statistic_conf: 597 [numerator, numerator_stat] = statistic_conf[ 598 "numerator" 599 ].split(".") 600 measures.append( 601 { 602 "name": "DAU_sampled" if sampling else "DAU", 603 "type": "count_distinct", 604 "label": "DAU", 605 "group_label": "Statistics", 606 "sql": "${TABLE}.client_id", 607 "hidden": "yes", 608 } 609 ) 610 611 if sampling: 612 measures.append( 613 { 614 "name": "DAU", 615 "type": "number", 616 "label": "DAU", 617 "group_label": "Statistics", 618 "sql": "${DAU_sampled} *" + sampling, 619 "hidden": "yes", 620 } 621 ) 622 623 measures.append( 624 { 625 "name": f"{dimension['name']}_{statistic_slug}", 626 "type": "number", 627 "label": f"{dimension_label} DAU Proportion", 628 "sql": "SAFE_DIVIDE(${" 629 + f"{numerator}_{numerator_stat}" 630 + "}, ${DAU})", 631 "group_label": "Statistics", 632 "description": f"Proportion of daily active users with {dimension['name']}", 633 } 634 ) 635 elif statistic_slug == "ratio": 636 if ( 637 "numerator" in statistic_conf 638 and "denominator" in statistic_conf 639 ): 640 [numerator, numerator_stat] = statistic_conf[ 641 "numerator" 642 ].split(".") 643 [denominator, denominator_stat] = statistic_conf[ 644 "denominator" 645 ].split(".") 646 647 measures.append( 648 { 649 "name": f"{dimension['name']}_{statistic_slug}", 650 "type": "number", 651 "label": f"{dimension_label} Ratio", 652 "sql": "SAFE_DIVIDE(${" 653 + f"{numerator}_{numerator_stat}" 654 + "}, ${" 655 + f"{denominator}_{denominator_stat}" 656 + "})", 657 "group_label": "Statistics", 658 "description": f"""" 659 Ratio between {statistic_conf['numerator']} and 660 {statistic_conf['denominator']}""", 661 } 662 ) 663 elif statistic_slug == "rolling_average": 664 # rolling averages are computed over existing statistics (e.g. sum, ratio) 665 aggregations = statistic_conf.get("aggregations", ["sum"]) 666 667 # Build a dynamic PARTITION BY clause for the window function. 668 # For each base field dimension (non-metric, non-client_id), emit a 669 # liquid conditional so the field is only included in the partition 670 # when it's actually in the query. The trailing constant `1` ensures 671 # the PARTITION BY clause is never empty (when no grouping dimension 672 # is selected `PARTITION BY 1` is equivalent to a global window). 673 partition_by_conditions = "".join( 674 f"{{% if {self.name}.{dim['name']}._is_selected %}}{dim['sql']}," 675 f"{{% endif %}}" 676 for dim in dimensions 677 if dim.get("group_label") == "Base Fields" 678 and dim["name"] != "client_id" 679 ) 680 partition_by_clause = ( 681 f"PARTITION BY {partition_by_conditions} 1" 682 ) 683 684 for aggregation in aggregations: 685 # find measures that match the current dimension and aggregation type 686 matching_measures = [ 687 m 688 for m in measures 689 if m["name"].startswith( 690 f"{dimension['name']}_{aggregation}" 691 ) 692 ] 693 if "window_sizes" in statistic_conf: 694 for window_size in statistic_conf["window_sizes"]: 695 for matching_measure in matching_measures: 696 # these statistics require some time dimension to be selected 697 measures.append( 698 { 699 "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_{window_size}_day", 700 "type": "number", 701 "label": f"{matching_measure['label']} {window_size} Day Rolling Average", 702 "sql": f""" 703 {{% if {self.name}.submission_date._is_selected or 704 {self.name}.submission_week._is_selected or 705 {self.name}.submission_month._is_selected or 706 {self.name}.submission_quarter._is_selected or 707 {self.name}.submission_year._is_selected %}} 708 AVG(${{{matching_measure['name']}}}) OVER ( 709 {partition_by_clause} 710 {{% if date_groupby_position._parameter_value != "" %}} 711 ORDER BY {{% parameter date_groupby_position %}} 712 {{% elsif {self.name}.submission_date._is_selected %}} 713 ORDER BY ${{TABLE}}.analysis_basis 714 {{% else %}} 715 ERROR("date_groupby_position needs to be set when using submission_week, 716 submission_month, submission_quarter, or submission_year") 717 {{% endif %}} 718 ROWS BETWEEN {window_size - 1} PRECEDING AND CURRENT ROW 719 ) 720 {{% else %}} 721 ERROR('Please select a "submission_*" field to compute the rolling average') 722 {{% endif %}} 723 """, 724 "group_label": "Statistics", 725 "description": f"{window_size} day rolling average of {dimension_label}", 726 } 727 ) 728 729 # Parametric-window measure: uses the rolling_average_window_size 730 # parameter so the user can set any window size at query time. 731 # The parameter value must be n-1 (preceding rows count) because 732 # SQL does not allow arithmetic in ROWS BETWEEN expressions. 733 for matching_measure in matching_measures: 734 measures.append( 735 { 736 "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_custom_window", 737 "type": "number", 738 "label": f"{matching_measure['label']} Custom Window Rolling Average", 739 "sql": f""" 740 {{% if {self.name}.submission_date._is_selected or 741 {self.name}.submission_week._is_selected or 742 {self.name}.submission_month._is_selected or 743 {self.name}.submission_quarter._is_selected or 744 {self.name}.submission_year._is_selected %}} 745 AVG(${{{matching_measure['name']}}}) OVER ( 746 {partition_by_clause} 747 {{% if date_groupby_position._parameter_value != "" %}} 748 ORDER BY {{% parameter date_groupby_position %}} 749 {{% elsif {self.name}.submission_date._is_selected %}} 750 ORDER BY ${{TABLE}}.analysis_basis 751 {{% else %}} 752 ERROR("date_groupby_position needs to be set when using submission_week, 753 submission_month, submission_quarter, or submission_year") 754 {{% endif %}} 755 ROWS BETWEEN 756 {{{{ rolling_average_window_size._parameter_value | minus: 1 }}}} 757 PRECEDING AND CURRENT ROW 758 ) 759 {{% else %}} 760 ERROR('Please select a "submission_*" field to compute the rolling average') 761 {{% endif %}} 762 """, 763 "group_label": "Statistics", 764 "description": f"Rolling average of {dimension_label} using a window size " 765 + "controlled by the 'Rolling Average Custom Window Size' parameter.", 766 } 767 ) 768 769 # period-over-period measures compare current values with historical values 770 if "period_over_period" in statistic_conf: 771 # find all statistics that have period-over-period configured 772 matching_measures = [ 773 m 774 for m in measures 775 if m["name"].startswith( 776 f"{dimension['name']}_{statistic_slug}" 777 ) 778 and "_period_over_period_" not in m["name"] 779 ] 780 781 # create period-over-period measures for each configured time period 782 for period in statistic_conf["period_over_period"].get( 783 "periods", [] 784 ): 785 for matching_measure in matching_measures: 786 original_sql = matching_measure["sql"] 787 788 # rolling averages need special handling to adjust window sizes 789 # based on the selected time granularity 790 if ( 791 statistic_slug == "rolling_average" 792 and "_custom_window" in matching_measure["name"] 793 ): 794 sql = self._create_custom_window_period_sql( 795 original_sql, period 796 ) 797 elif statistic_slug == "rolling_average": 798 sql = self._create_rolling_average_period_sql( 799 original_sql, period 800 ) 801 else: 802 # standard measures use LAG function with time-adjusted periods 803 sql = self._create_lag_period_sql( 804 matching_measure, period 805 ) 806 807 # generate different types of period-over-period comparisons 808 for kind in statistic_conf["period_over_period"].get( 809 "kinds", ["previous"] 810 ): 811 if kind == "difference": 812 comparison_sql = f"({original_sql}) - ({sql})" 813 elif kind == "relative_change": 814 comparison_sql = f"SAFE_DIVIDE(({original_sql}), NULLIF(({sql}), 0)) - 1" 815 else: 816 comparison_sql = sql 817 818 measures.append( 819 { 820 "name": f"{matching_measure['name']}_{period}_day_period_over_period_{kind}", 821 "type": "number", 822 "label": f"{matching_measure['label']} " 823 + f"{period} Day Period Over Period {kind.capitalize()}", 824 "description": f"Period over period {kind.capitalize()} of " 825 + f"{matching_measure['label']} over {period} days", 826 "group_label": "Statistics", 827 "sql": comparison_sql, 828 } 829 ) 830 831 return measures 832 833 def _create_rolling_average_period_sql(self, original_sql: str, period: int) -> str: 834 """ 835 Create period-over-period SQL for rolling average measures. 836 837 Rolling averages require adjusting the window size based on the selected time granularity. 838 """ 839 rows_match = re.search( 840 r"ROWS BETWEEN (\d+) PRECEDING AND CURRENT ROW", 841 original_sql, 842 ) 843 844 if not rows_match: 845 return original_sql 846 847 original_window_size = int(rows_match.group(1)) 848 time_conditions = [] 849 850 for unit, divisor in self.TIME_UNITS: 851 # calculate adjusted window size for this time granularity 852 adjusted_window = ( 853 (original_window_size + period) // divisor 854 if unit != "date" 855 else original_window_size + period 856 ) 857 858 condition = ( 859 f"{{% {'if' if unit == 'date' else 'elsif'} " 860 + f"{self.name}.submission_{unit}._is_selected %}}" 861 ) 862 863 # modify the ROWS clause to extend the window by the period 864 modified_sql = re.sub( 865 r"ROWS BETWEEN \d+ PRECEDING AND CURRENT ROW", 866 f"ROWS BETWEEN {adjusted_window} PRECEDING AND " 867 + f"{adjusted_window - original_window_size} PRECEDING", 868 original_sql, 869 ) 870 time_conditions.append(f"{condition}\n{modified_sql}") 871 872 return ( 873 "\n".join(time_conditions) 874 + f"\n{{% else %}}\n{original_sql}\n{{% endif %}}" 875 ) 876 877 def _create_custom_window_period_sql(self, original_sql: str, period: int) -> str: 878 """ 879 Create period-over-period SQL for custom rolling average measures. 880 881 Because the window size is a runtime Looker parameter, Python arithmetic 882 cannot be used. Instead, Liquid {% assign %} tags compute the shifted 883 window boundaries at query time. 884 885 For date granularity (divisor=1): 886 preceding = window_size - 1 887 start = preceding + period 888 end = period 889 → ROWS BETWEEN {{ preceding | plus: period }} PRECEDING AND period PRECEDING 890 891 For coarser granularities (divisor D): 892 adjusted = (preceding + period) / D (integer division) 893 adjusted_end = adjusted - preceding 894 → ROWS BETWEEN {{ adjusted }} PRECEDING AND {{ adjusted_end }} PRECEDING 895 """ 896 rows_pattern = r"ROWS BETWEEN\s*\{\{[^}]+\}\}\s*PRECEDING AND CURRENT ROW" 897 if not re.search(rows_pattern, original_sql, re.DOTALL): 898 return original_sql 899 900 time_conditions = [] 901 for unit, divisor in self.TIME_UNITS: 902 condition = ( 903 f"{{% {'if' if unit == 'date' else 'elsif'} " 904 + f"{self.name}.submission_{unit}._is_selected %}}" 905 ) 906 907 if unit == "date": 908 rows_replacement = ( 909 f"{{% assign preceding = rolling_average_window_size._parameter_value | minus: 1 %}}\n" 910 f"ROWS BETWEEN {{{{ preceding | plus: {period} }}}} PRECEDING AND {period} PRECEDING" 911 ) 912 else: 913 rows_replacement = ( 914 f"{{% assign preceding = rolling_average_window_size._parameter_value | minus: 1 %}}\n" 915 f"{{% assign adjusted = preceding | plus: {period} | divided_by: {divisor} %}}\n" 916 f"{{% assign adjusted_end = adjusted | minus: preceding %}}\n" 917 f"ROWS BETWEEN {{{{ adjusted }}}} PRECEDING AND {{{{ adjusted_end }}}} PRECEDING" 918 ) 919 920 modified_sql = re.sub( 921 rows_pattern, rows_replacement, original_sql, flags=re.DOTALL 922 ) 923 time_conditions.append(f"{condition}\n{modified_sql}") 924 925 return ( 926 "\n".join(time_conditions) 927 + f"\n{{% else %}}\n{original_sql}\n{{% endif %}}" 928 ) 929 930 def _create_lag_period_sql(self, matching_measure: dict, period: int) -> str: 931 """ 932 Create period-over-period SQL using LAG function for standard measures. 933 934 LAG function looks back N periods to get historical values. The period is adjusted 935 based on the selected time granularity (daily, weekly, monthly, etc.). 936 """ 937 time_conditions = [] 938 939 for unit, divisor in self.TIME_UNITS: 940 # calculate adjusted period for this time granularity 941 adjusted_period = period // divisor if unit != "date" else period 942 943 order_by = ( 944 f"${{submission_{unit}}}" if unit != "date" else "${submission_date}" 945 ) 946 947 condition = ( 948 f"{{% {'if' if unit == 'date' else 'elsif'} " 949 + f"{self.name}.submission_{unit}._is_selected %}}" 950 ) 951 952 lag_sql = f"""LAG(${{{matching_measure['name']}}}, {adjusted_period}) OVER ( 953 {{% if date_groupby_position._parameter_value != "" %}} 954 ORDER BY {{% parameter date_groupby_position %}} 955 {{% else %}} 956 ORDER BY {order_by} 957 {{% endif %}} 958 )""" 959 time_conditions.append(f"{condition}\n{lag_sql}") 960 961 return ( 962 "\n".join(time_conditions) 963 + f"\n{{% else %}}\nLAG({matching_measure['name']}, {period}) " 964 + "OVER (ORDER BY ${submission_date})\n{% endif %}" 965 )
A view for metric-hub metrics that come from the same data source.
MetricDefinitionsView(namespace: str, name: str, tables: List[Dict[str, str]])
29 def __init__(self, namespace: str, name: str, tables: List[Dict[str, str]]): 30 """Get an instance of an MetricDefinitionsView.""" 31 super().__init__(namespace, name, MetricDefinitionsView.type, tables)
Get an instance of an MetricDefinitionsView.
@classmethod
def
from_db_views( klass, namespace: str, is_glean: bool, channels: List[Dict[str, str]], db_views: dict) -> Iterator[MetricDefinitionsView]:
33 @classmethod 34 def from_db_views( 35 klass, 36 namespace: str, 37 is_glean: bool, 38 channels: List[Dict[str, str]], 39 db_views: dict, 40 ) -> Iterator[MetricDefinitionsView]: 41 """Get Metric Definition Views from db views and app variants.""" 42 return iter(())
Get Metric Definition Views from db views and app variants.
@classmethod
def
from_dict( klass, namespace: str, name: str, definition: generator.views.view.ViewDict) -> MetricDefinitionsView:
44 @classmethod 45 def from_dict( 46 klass, namespace: str, name: str, definition: ViewDict 47 ) -> MetricDefinitionsView: 48 """Get a MetricDefinitionsView from a dict representation.""" 49 return klass(namespace, name, definition.get("tables", []))
Get a MetricDefinitionsView from a dict representation.
def
to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]:
51 def to_lookml(self, v1_name: Optional[str], dryrun) -> Dict[str, Any]: 52 """Get this view as LookML.""" 53 namespace_definitions = MetricsConfigLoader.configs.get_platform_definitions( 54 self.namespace 55 ) 56 if namespace_definitions is None: 57 return {} 58 59 # get all metric definitions that depend on the data source represented by this view 60 data_source_name = re.sub("^metric_definitions_", "", self.name) 61 data_source_definition = MetricsConfigLoader.configs.get_data_source_definition( 62 data_source_name, self.namespace 63 ) 64 65 if data_source_definition is None: 66 return {} 67 68 # todo: hide deprecated metrics? 69 metric_definitions = [ 70 f"""{ 71 MetricsConfigLoader.configs.get_env().from_string(metric.select_expression).render() 72 } AS {metric_slug},\n""" 73 for metric_slug, metric in namespace_definitions.metrics.definitions.items() 74 if metric.select_expression 75 and metric.data_source.name == data_source_name 76 and metric.type != "histogram" 77 ] 78 79 if metric_definitions == []: 80 return {} 81 82 # Metric definitions are intended to aggregated by client per date. 83 # A derived table is needed to do these aggregations, instead of defining them as measures 84 # we want to have them available as dimensions (which don't allow aggregations in their definitions) 85 # to allow for custom measures to be later defined in Looker that aggregate these per client metrics. 86 view_defn: Dict[str, Any] = {"name": self.name} 87 88 ignore_base_fields = [ 89 "client_id", 90 "submission_date", 91 "submission", 92 "first_run", 93 ] + [ 94 metric_slug 95 for metric_slug, metric in namespace_definitions.metrics.definitions.items() 96 if metric.select_expression 97 and metric.data_source.name == data_source_name 98 and metric.type != "histogram" 99 ] 100 101 base_view_dimensions = {} 102 joined_data_sources = [] 103 104 # check if the metric data source has joins 105 # joined data sources are generally used for creating the "Base Fields" 106 if data_source_definition.joins: 107 # determine the dimensions selected by the joined data sources 108 for joined_data_source_slug, join in data_source_definition.joins.items(): 109 joined_data_source = ( 110 MetricsConfigLoader.configs.get_data_source_definition( 111 joined_data_source_slug, self.namespace 112 ) 113 ) 114 115 if joined_data_source.columns_as_dimensions: 116 joined_data_sources.append(joined_data_source) 117 118 date_filter = None 119 if joined_data_source.submission_date_column != "NULL": 120 date_filter = ( 121 None 122 if joined_data_source.submission_date_column is None 123 or joined_data_source.submission_date_column == "NULL" 124 else f"{joined_data_source.submission_date_column} = '2023-01-01'" 125 ) 126 127 # create Looker dimensions by doing a dryrun 128 query = MetricsConfigLoader.configs.get_data_source_sql( 129 joined_data_source_slug, 130 self.namespace, 131 where=date_filter, 132 ).format(dataset=self.namespace) 133 134 base_view_dimensions[joined_data_source_slug] = ( 135 lookml_utils._generate_dimensions_from_query( 136 query, dryrun=dryrun 137 ) 138 ) 139 140 if ( 141 data_source_definition.client_id_column == "NULL" 142 or data_source_definition.columns_as_dimensions 143 ): 144 # if the metrics data source doesn't have any joins then use the dimensions 145 # of the data source itself as base fields 146 date_filter = None 147 if data_source_definition.submission_date_column != "NULL": 148 date_filter = ( 149 "submission_date = '2023-01-01'" 150 if data_source_definition.submission_date_column is None 151 else f"{data_source_definition.submission_date_column} = '2023-01-01'" 152 ) 153 154 query = MetricsConfigLoader.configs.get_data_source_sql( 155 data_source_definition.name, 156 self.namespace, 157 where=date_filter, 158 ignore_joins=True, 159 ).format(dataset=self.namespace) 160 161 base_view_dimensions[data_source_definition.name] = ( 162 lookml_utils._generate_dimensions_from_query(query, dryrun) 163 ) 164 165 # to prevent duplicate dimensions, especially when working with time dimensions 166 # where names are modified potentially causing naming collisions 167 seen_dimensions = set() 168 # prepare base field data for query 169 base_view_fields = [] 170 for data_source, dimensions in base_view_dimensions.items(): 171 for dimension in dimensions: 172 if ( 173 dimension["name"] not in ignore_base_fields 174 and dimension["name"] not in seen_dimensions 175 and "hidden" not in dimension 176 ): 177 sql = ( 178 f"{data_source}.{dimension['name'].replace('__', '.')} AS" 179 + f" {data_source}_{dimension['name']},\n" 180 ) 181 # date/time/timestamp suffixes are removed when generating lookml dimensions, however we 182 # need the original field name for the derived view SQL 183 if dimension["type"] == "time" and not dimension["sql"].endswith( 184 dimension["name"] 185 ): 186 suffix = dimension["sql"].split( 187 dimension["name"].replace("__", ".") 188 )[-1] 189 sql = ( 190 f"{data_source}.{(dimension['name']+suffix).replace('__', '.')} AS" 191 + f" {data_source}_{dimension['name']},\n" 192 ) 193 194 base_view_fields.append( 195 { 196 "name": f"{data_source}_{dimension['name']}", 197 "select_sql": f"{data_source}_{dimension['name']},\n", 198 "sql": sql, 199 } 200 ) 201 seen_dimensions.add(dimension["name"]) 202 203 client_id_field = ( 204 "NULL" 205 if data_source_definition.client_id_column == "NULL" 206 else f'{data_source_definition.client_id_column or "client_id"}' 207 ) 208 209 # filters for date ranges 210 where_sql = " AND ".join( 211 [ 212 f""" 213 {data_source.name}.{data_source.submission_date_column or "submission_date"} 214 {{% if analysis_period._is_filtered %}} 215 BETWEEN 216 DATE_SUB( 217 COALESCE( 218 SAFE_CAST( 219 {{% date_start analysis_period %}} AS DATE 220 ), CURRENT_DATE()), 221 INTERVAL {{% parameter lookback_days %}} DAY 222 ) AND 223 COALESCE( 224 SAFE_CAST( 225 {{% date_end analysis_period %}} AS DATE 226 ), CURRENT_DATE()) 227 {{% else %}} 228 BETWEEN 229 DATE_SUB( 230 COALESCE( 231 SAFE_CAST( 232 {{% date_start submission_date %}} AS DATE 233 ), CURRENT_DATE()), 234 INTERVAL {{% parameter lookback_days %}} DAY 235 ) AND 236 COALESCE( 237 SAFE_CAST( 238 {{% date_end submission_date %}} AS DATE 239 ), CURRENT_DATE()) 240 {{% endif %}} 241 """ 242 for data_source in [data_source_definition] + joined_data_sources 243 if data_source.submission_date_column != "NULL" 244 ] 245 ) 246 247 # filte on sample_id if such a field exists 248 for field in base_view_fields: 249 if field["name"].endswith("_sample_id"): 250 where_sql += f""" 251 AND 252 {field['name'].split('_sample_id')[0]}.sample_id < {{% parameter sampling %}} 253 """ 254 break 255 256 view_defn["derived_table"] = { 257 "sql": f""" 258 SELECT 259 {"".join(metric_definitions)} 260 {"".join([field['select_sql'] for field in base_view_fields])} 261 {client_id_field} AS client_id, 262 {{% if aggregate_metrics_by._parameter_value == 'day' %}} 263 {data_source_definition.submission_date_column or "submission_date"} AS analysis_basis 264 {{% elsif aggregate_metrics_by._parameter_value == 'week' %}} 265 (FORMAT_DATE( 266 '%F', 267 DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"}, 268 WEEK(MONDAY))) 269 ) AS analysis_basis 270 {{% elsif aggregate_metrics_by._parameter_value == 'month' %}} 271 (FORMAT_DATE( 272 '%Y-%m', 273 {data_source_definition.submission_date_column or "submission_date"}) 274 ) AS analysis_basis 275 {{% elsif aggregate_metrics_by._parameter_value == 'quarter' %}} 276 (FORMAT_DATE( 277 '%Y-%m', 278 DATE_TRUNC({data_source_definition.submission_date_column or "submission_date"}, 279 QUARTER)) 280 ) AS analysis_basis 281 {{% elsif aggregate_metrics_by._parameter_value == 'year' %}} 282 (EXTRACT( 283 YEAR FROM {data_source_definition.submission_date_column or "submission_date"}) 284 ) AS analysis_basis 285 {{% else %}} 286 NULL as analysis_basis 287 {{% endif %}} 288 FROM 289 ( 290 SELECT 291 {data_source_name}.*, 292 {"".join([field['sql'] for field in base_view_fields])} 293 FROM 294 { 295 MetricsConfigLoader.configs.get_data_source_sql( 296 data_source_name, 297 self.namespace, 298 select_fields=False 299 ).format(dataset=self.namespace) 300 } 301 WHERE {where_sql} 302 ) 303 GROUP BY 304 {"".join([field['select_sql'] for field in base_view_fields])} 305 client_id, 306 analysis_basis 307 """ 308 } 309 310 view_defn["dimensions"] = self.get_dimensions() 311 view_defn["dimension_groups"] = self.get_dimension_groups() 312 313 # add the Looker dimensions 314 for data_source, dimensions in base_view_dimensions.items(): 315 for dimension in dimensions: 316 if dimension["name"] not in ignore_base_fields: 317 dimension["sql"] = ( 318 "${TABLE}." + f"{data_source}_{dimension['name']}" 319 ) 320 dimension["group_label"] = "Base Fields" 321 if not lookml_utils._is_dimension_group(dimension): 322 view_defn["dimensions"].append(dimension) 323 else: 324 view_defn["dimension_groups"].append(dimension) 325 # avoid duplicate dimensions 326 ignore_base_fields.append(dimension["name"]) 327 328 view_defn["measures"] = self.get_measures( 329 view_defn["dimensions"], 330 ) 331 view_defn["sets"] = self._get_sets() 332 rolling_average_window_sizes = sorted( 333 { 334 window_size 335 for metric in namespace_definitions.metrics.definitions.values() 336 if metric.select_expression 337 and metric.data_source.name == data_source_name 338 and metric.type != "histogram" 339 and metric.statistics 340 for stat_slug, stat_conf in metric.statistics.items() 341 if stat_slug == "rolling_average" 342 for window_size in stat_conf.get("window_sizes", []) 343 } 344 ) 345 view_defn["parameters"] = self._get_parameters( 346 view_defn["dimensions"], rolling_average_window_sizes 347 ) 348 view_defn["filters"] = self._get_filters() 349 350 return {"views": [view_defn]}
Get this view as LookML.
def
get_dimensions( self, _table=None, _v1_name: Optional[str] = None, _dryrun=None) -> List[Dict[str, Any]]:
352 def get_dimensions( 353 self, 354 _table=None, 355 _v1_name: Optional[str] = None, 356 _dryrun=None, 357 ) -> List[Dict[str, Any]]: 358 """Get the set of dimensions for this view based on the metric definitions in metric-hub.""" 359 namespace_definitions = MetricsConfigLoader.configs.get_platform_definitions( 360 self.namespace 361 ) 362 metric_definitions = namespace_definitions.metrics.definitions 363 data_source_name = re.sub("^metric_definitions_", "", self.name) 364 365 return [ 366 { 367 "name": "client_id", 368 "type": "string", 369 "sql": "SAFE_CAST(${TABLE}.client_id AS STRING)", 370 "label": "Client ID", 371 "primary_key": "yes", 372 "group_label": "Base Fields", 373 "description": "Unique client identifier", 374 }, 375 ] + [ # add a dimension for each metric definition 376 { 377 "name": metric_slug, 378 "group_label": "Metrics", 379 "label": metric.friendly_name 380 or lookml_utils.slug_to_title(metric_slug), 381 "description": metric.description or "", 382 "type": "number", 383 "sql": "${TABLE}." + metric_slug, 384 } 385 for metric_slug, metric in metric_definitions.items() 386 if metric.select_expression 387 and metric.data_source.name == data_source_name 388 and metric.type != "histogram" 389 ]
Get the set of dimensions for this view based on the metric definitions in metric-hub.
def
get_dimension_groups(self) -> List[Dict[str, Any]]:
391 def get_dimension_groups(self) -> List[Dict[str, Any]]: 392 """Get dimension groups for this view.""" 393 return [ 394 { 395 "name": "submission", 396 "type": "time", 397 "datatype": "date", 398 "group_label": "Base Fields", 399 "sql": "${TABLE}.analysis_basis", 400 "label": "Submission", 401 "timeframes": [ 402 "raw", 403 "date", 404 "week", 405 "month", 406 "quarter", 407 "year", 408 ], 409 } 410 ]
Get dimension groups for this view.
def
get_measures( self, dimensions: List[dict]) -> List[Dict[str, Union[str, List[Dict[str, str]]]]]:
509 def get_measures( 510 self, dimensions: List[dict] 511 ) -> List[Dict[str, Union[str, List[Dict[str, str]]]]]: 512 """Get statistics as measures.""" 513 measures = [] 514 sampling = "1" 515 516 for dim in dimensions: 517 if dim["name"] == "sample_id": 518 sampling = "100 / {% parameter sampling %}" 519 break 520 521 for dimension in dimensions: 522 metric = MetricsConfigLoader.configs.get_metric_definition( 523 dimension["name"], self.namespace 524 ) 525 if metric and metric.statistics: 526 # Sort statistics so that rolling_average is processed last, 527 # since it depends on measures created by other statistics 528 # (e.g. sum, ratio) to already exist in the measures list. 529 sorted_statistics = sorted( 530 metric.statistics.items(), 531 key=lambda item: item[0] == "rolling_average", 532 ) 533 for statistic_slug, statistic_conf in sorted_statistics: 534 dimension_label = dimension.get("label") or dimension.get("name") 535 if statistic_slug in [ 536 "average", 537 "max", 538 "min", 539 "median", 540 ]: 541 measures.append( 542 { 543 "name": f"{dimension['name']}_{statistic_slug}", 544 "type": statistic_slug, 545 "sql": "${TABLE}." + dimension["name"], 546 "label": f"{dimension_label} {statistic_slug.title()}", 547 "group_label": "Statistics", 548 "description": f"{statistic_slug.title()} of {dimension_label}", 549 } 550 ) 551 elif statistic_slug == "sum": 552 measures.append( 553 { 554 "name": f"{dimension['name']}_{statistic_slug}", 555 "type": "sum", 556 "sql": "${TABLE}." + dimension["name"] + "*" + sampling, 557 "label": f"{dimension_label} Sum", 558 "group_label": "Statistics", 559 "description": f"Sum of {dimension_label}", 560 } 561 ) 562 elif statistic_slug == "client_count": 563 measures.append( 564 { 565 "name": ( 566 f"{dimension['name']}_{statistic_slug}_sampled" 567 if sampling 568 else f"{dimension['name']}_{statistic_slug}" 569 ), 570 "type": "count_distinct", 571 "label": f"{dimension_label} Client Count", 572 "group_label": "Statistics", 573 "sql": "IF(${TABLE}." 574 + f"{dimension['name']} > 0, " 575 + "${TABLE}.client_id, SAFE_CAST(NULL AS STRING))", 576 "description": f"Number of clients with {dimension_label}", 577 "hidden": "yes" if sampling else "no", 578 } 579 ) 580 581 if sampling: 582 measures.append( 583 { 584 "name": f"{dimension['name']}_{statistic_slug}", 585 "type": "number", 586 "label": f"{dimension_label} Client Count", 587 "group_label": "Statistics", 588 "sql": "${" 589 + f"{dimension['name']}_{statistic_slug}_sampled" 590 + "} *" 591 + sampling, 592 "description": f"Number of clients with {dimension_label}", 593 } 594 ) 595 elif statistic_slug == "dau_proportion": 596 if "numerator" in statistic_conf: 597 [numerator, numerator_stat] = statistic_conf[ 598 "numerator" 599 ].split(".") 600 measures.append( 601 { 602 "name": "DAU_sampled" if sampling else "DAU", 603 "type": "count_distinct", 604 "label": "DAU", 605 "group_label": "Statistics", 606 "sql": "${TABLE}.client_id", 607 "hidden": "yes", 608 } 609 ) 610 611 if sampling: 612 measures.append( 613 { 614 "name": "DAU", 615 "type": "number", 616 "label": "DAU", 617 "group_label": "Statistics", 618 "sql": "${DAU_sampled} *" + sampling, 619 "hidden": "yes", 620 } 621 ) 622 623 measures.append( 624 { 625 "name": f"{dimension['name']}_{statistic_slug}", 626 "type": "number", 627 "label": f"{dimension_label} DAU Proportion", 628 "sql": "SAFE_DIVIDE(${" 629 + f"{numerator}_{numerator_stat}" 630 + "}, ${DAU})", 631 "group_label": "Statistics", 632 "description": f"Proportion of daily active users with {dimension['name']}", 633 } 634 ) 635 elif statistic_slug == "ratio": 636 if ( 637 "numerator" in statistic_conf 638 and "denominator" in statistic_conf 639 ): 640 [numerator, numerator_stat] = statistic_conf[ 641 "numerator" 642 ].split(".") 643 [denominator, denominator_stat] = statistic_conf[ 644 "denominator" 645 ].split(".") 646 647 measures.append( 648 { 649 "name": f"{dimension['name']}_{statistic_slug}", 650 "type": "number", 651 "label": f"{dimension_label} Ratio", 652 "sql": "SAFE_DIVIDE(${" 653 + f"{numerator}_{numerator_stat}" 654 + "}, ${" 655 + f"{denominator}_{denominator_stat}" 656 + "})", 657 "group_label": "Statistics", 658 "description": f"""" 659 Ratio between {statistic_conf['numerator']} and 660 {statistic_conf['denominator']}""", 661 } 662 ) 663 elif statistic_slug == "rolling_average": 664 # rolling averages are computed over existing statistics (e.g. sum, ratio) 665 aggregations = statistic_conf.get("aggregations", ["sum"]) 666 667 # Build a dynamic PARTITION BY clause for the window function. 668 # For each base field dimension (non-metric, non-client_id), emit a 669 # liquid conditional so the field is only included in the partition 670 # when it's actually in the query. The trailing constant `1` ensures 671 # the PARTITION BY clause is never empty (when no grouping dimension 672 # is selected `PARTITION BY 1` is equivalent to a global window). 673 partition_by_conditions = "".join( 674 f"{{% if {self.name}.{dim['name']}._is_selected %}}{dim['sql']}," 675 f"{{% endif %}}" 676 for dim in dimensions 677 if dim.get("group_label") == "Base Fields" 678 and dim["name"] != "client_id" 679 ) 680 partition_by_clause = ( 681 f"PARTITION BY {partition_by_conditions} 1" 682 ) 683 684 for aggregation in aggregations: 685 # find measures that match the current dimension and aggregation type 686 matching_measures = [ 687 m 688 for m in measures 689 if m["name"].startswith( 690 f"{dimension['name']}_{aggregation}" 691 ) 692 ] 693 if "window_sizes" in statistic_conf: 694 for window_size in statistic_conf["window_sizes"]: 695 for matching_measure in matching_measures: 696 # these statistics require some time dimension to be selected 697 measures.append( 698 { 699 "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_{window_size}_day", 700 "type": "number", 701 "label": f"{matching_measure['label']} {window_size} Day Rolling Average", 702 "sql": f""" 703 {{% if {self.name}.submission_date._is_selected or 704 {self.name}.submission_week._is_selected or 705 {self.name}.submission_month._is_selected or 706 {self.name}.submission_quarter._is_selected or 707 {self.name}.submission_year._is_selected %}} 708 AVG(${{{matching_measure['name']}}}) OVER ( 709 {partition_by_clause} 710 {{% if date_groupby_position._parameter_value != "" %}} 711 ORDER BY {{% parameter date_groupby_position %}} 712 {{% elsif {self.name}.submission_date._is_selected %}} 713 ORDER BY ${{TABLE}}.analysis_basis 714 {{% else %}} 715 ERROR("date_groupby_position needs to be set when using submission_week, 716 submission_month, submission_quarter, or submission_year") 717 {{% endif %}} 718 ROWS BETWEEN {window_size - 1} PRECEDING AND CURRENT ROW 719 ) 720 {{% else %}} 721 ERROR('Please select a "submission_*" field to compute the rolling average') 722 {{% endif %}} 723 """, 724 "group_label": "Statistics", 725 "description": f"{window_size} day rolling average of {dimension_label}", 726 } 727 ) 728 729 # Parametric-window measure: uses the rolling_average_window_size 730 # parameter so the user can set any window size at query time. 731 # The parameter value must be n-1 (preceding rows count) because 732 # SQL does not allow arithmetic in ROWS BETWEEN expressions. 733 for matching_measure in matching_measures: 734 measures.append( 735 { 736 "name": f"{dimension['name']}_{statistic_slug}_{aggregation}_custom_window", 737 "type": "number", 738 "label": f"{matching_measure['label']} Custom Window Rolling Average", 739 "sql": f""" 740 {{% if {self.name}.submission_date._is_selected or 741 {self.name}.submission_week._is_selected or 742 {self.name}.submission_month._is_selected or 743 {self.name}.submission_quarter._is_selected or 744 {self.name}.submission_year._is_selected %}} 745 AVG(${{{matching_measure['name']}}}) OVER ( 746 {partition_by_clause} 747 {{% if date_groupby_position._parameter_value != "" %}} 748 ORDER BY {{% parameter date_groupby_position %}} 749 {{% elsif {self.name}.submission_date._is_selected %}} 750 ORDER BY ${{TABLE}}.analysis_basis 751 {{% else %}} 752 ERROR("date_groupby_position needs to be set when using submission_week, 753 submission_month, submission_quarter, or submission_year") 754 {{% endif %}} 755 ROWS BETWEEN 756 {{{{ rolling_average_window_size._parameter_value | minus: 1 }}}} 757 PRECEDING AND CURRENT ROW 758 ) 759 {{% else %}} 760 ERROR('Please select a "submission_*" field to compute the rolling average') 761 {{% endif %}} 762 """, 763 "group_label": "Statistics", 764 "description": f"Rolling average of {dimension_label} using a window size " 765 + "controlled by the 'Rolling Average Custom Window Size' parameter.", 766 } 767 ) 768 769 # period-over-period measures compare current values with historical values 770 if "period_over_period" in statistic_conf: 771 # find all statistics that have period-over-period configured 772 matching_measures = [ 773 m 774 for m in measures 775 if m["name"].startswith( 776 f"{dimension['name']}_{statistic_slug}" 777 ) 778 and "_period_over_period_" not in m["name"] 779 ] 780 781 # create period-over-period measures for each configured time period 782 for period in statistic_conf["period_over_period"].get( 783 "periods", [] 784 ): 785 for matching_measure in matching_measures: 786 original_sql = matching_measure["sql"] 787 788 # rolling averages need special handling to adjust window sizes 789 # based on the selected time granularity 790 if ( 791 statistic_slug == "rolling_average" 792 and "_custom_window" in matching_measure["name"] 793 ): 794 sql = self._create_custom_window_period_sql( 795 original_sql, period 796 ) 797 elif statistic_slug == "rolling_average": 798 sql = self._create_rolling_average_period_sql( 799 original_sql, period 800 ) 801 else: 802 # standard measures use LAG function with time-adjusted periods 803 sql = self._create_lag_period_sql( 804 matching_measure, period 805 ) 806 807 # generate different types of period-over-period comparisons 808 for kind in statistic_conf["period_over_period"].get( 809 "kinds", ["previous"] 810 ): 811 if kind == "difference": 812 comparison_sql = f"({original_sql}) - ({sql})" 813 elif kind == "relative_change": 814 comparison_sql = f"SAFE_DIVIDE(({original_sql}), NULLIF(({sql}), 0)) - 1" 815 else: 816 comparison_sql = sql 817 818 measures.append( 819 { 820 "name": f"{matching_measure['name']}_{period}_day_period_over_period_{kind}", 821 "type": "number", 822 "label": f"{matching_measure['label']} " 823 + f"{period} Day Period Over Period {kind.capitalize()}", 824 "description": f"Period over period {kind.capitalize()} of " 825 + f"{matching_measure['label']} over {period} days", 826 "group_label": "Statistics", 827 "sql": comparison_sql, 828 } 829 ) 830 831 return measures
Get statistics as measures.