generator.explores.events_stream_explore
An explore for events_stream views.
1"""An explore for `events_stream` views.""" 2 3from __future__ import annotations 4 5from pathlib import Path 6from typing import Any, Iterator, Optional 7 8from ..views import EventsStreamView, View 9from .explore import Explore 10 11 12class EventsStreamExplore(Explore): 13 """An explore for `events_stream` views.""" 14 15 type: str = "events_stream_explore" 16 17 @staticmethod 18 def from_views(views: list[View]) -> Iterator[EventsStreamExplore]: 19 """Where possible, generate EventsStreamExplores for views.""" 20 for view in views: 21 if isinstance(view, EventsStreamView): 22 yield EventsStreamExplore(view.name, {"base_view": view.name}) 23 24 @staticmethod 25 def from_dict(name: str, defn: dict, views_path: Path) -> EventsStreamExplore: 26 """Get an instance of this explore from a dictionary definition.""" 27 return EventsStreamExplore(name, defn["views"], views_path) 28 29 def get_required_filters(self, view_name: str) -> list[dict[str, str]]: 30 """Get required filters for this view.""" 31 # Use 7 days instead of 28 days to avoid "Query Exceeds Data Limit" errors for large event datasets. 32 return [{"submission_date": "7 days"}] 33 34 def _to_lookml(self, v1_name: Optional[str]) -> list[dict[str, Any]]: 35 lookml: dict[str, Any] = { 36 "name": self.name, 37 "view_name": self.views["base_view"], 38 "joins": self.get_unnested_fields_joins_lookml(), 39 "always_filter": { 40 "filters": self.get_required_filters("base_view"), 41 }, 42 "queries": [ 43 { 44 "name": "recent_event_counts", 45 "description": "Event counts during the past week.", 46 "dimensions": ["event"], 47 "measures": ["event_count"], 48 "filters": [{"submission_date": "7 days"}], 49 }, 50 { 51 "name": "sampled_recent_event_counts", 52 "description": "A 1% sample of event counts during the past week.", 53 "dimensions": ["event"], 54 "measures": ["event_count"], 55 "filters": [ 56 {"submission_date": "7 days"}, 57 {"sample_id": "[0, 0]"}, 58 ], 59 }, 60 ], 61 } 62 63 if datagroup := self.get_datagroup(): 64 lookml["persist_with"] = datagroup 65 66 return [lookml]
13class EventsStreamExplore(Explore): 14 """An explore for `events_stream` views.""" 15 16 type: str = "events_stream_explore" 17 18 @staticmethod 19 def from_views(views: list[View]) -> Iterator[EventsStreamExplore]: 20 """Where possible, generate EventsStreamExplores for views.""" 21 for view in views: 22 if isinstance(view, EventsStreamView): 23 yield EventsStreamExplore(view.name, {"base_view": view.name}) 24 25 @staticmethod 26 def from_dict(name: str, defn: dict, views_path: Path) -> EventsStreamExplore: 27 """Get an instance of this explore from a dictionary definition.""" 28 return EventsStreamExplore(name, defn["views"], views_path) 29 30 def get_required_filters(self, view_name: str) -> list[dict[str, str]]: 31 """Get required filters for this view.""" 32 # Use 7 days instead of 28 days to avoid "Query Exceeds Data Limit" errors for large event datasets. 33 return [{"submission_date": "7 days"}] 34 35 def _to_lookml(self, v1_name: Optional[str]) -> list[dict[str, Any]]: 36 lookml: dict[str, Any] = { 37 "name": self.name, 38 "view_name": self.views["base_view"], 39 "joins": self.get_unnested_fields_joins_lookml(), 40 "always_filter": { 41 "filters": self.get_required_filters("base_view"), 42 }, 43 "queries": [ 44 { 45 "name": "recent_event_counts", 46 "description": "Event counts during the past week.", 47 "dimensions": ["event"], 48 "measures": ["event_count"], 49 "filters": [{"submission_date": "7 days"}], 50 }, 51 { 52 "name": "sampled_recent_event_counts", 53 "description": "A 1% sample of event counts during the past week.", 54 "dimensions": ["event"], 55 "measures": ["event_count"], 56 "filters": [ 57 {"submission_date": "7 days"}, 58 {"sample_id": "[0, 0]"}, 59 ], 60 }, 61 ], 62 } 63 64 if datagroup := self.get_datagroup(): 65 lookml["persist_with"] = datagroup 66 67 return [lookml]
An explore for events_stream views.
@staticmethod
def
from_views( views: list[generator.views.view.View]) -> Iterator[EventsStreamExplore]:
18 @staticmethod 19 def from_views(views: list[View]) -> Iterator[EventsStreamExplore]: 20 """Where possible, generate EventsStreamExplores for views.""" 21 for view in views: 22 if isinstance(view, EventsStreamView): 23 yield EventsStreamExplore(view.name, {"base_view": view.name})
Where possible, generate EventsStreamExplores for views.
@staticmethod
def
from_dict( name: str, defn: dict, views_path: pathlib.Path) -> EventsStreamExplore:
25 @staticmethod 26 def from_dict(name: str, defn: dict, views_path: Path) -> EventsStreamExplore: 27 """Get an instance of this explore from a dictionary definition.""" 28 return EventsStreamExplore(name, defn["views"], views_path)
Get an instance of this explore from a dictionary definition.
def
get_required_filters(self, view_name: str) -> list[dict[str, str]]:
30 def get_required_filters(self, view_name: str) -> list[dict[str, str]]: 31 """Get required filters for this view.""" 32 # Use 7 days instead of 28 days to avoid "Query Exceeds Data Limit" errors for large event datasets. 33 return [{"submission_date": "7 days"}]
Get required filters for this view.