generator.dryrun
Dry Run method to get BigQuery metadata.
1"""Dry Run method to get BigQuery metadata.""" 2 3import json 4import os 5import time 6from enum import Enum 7from functools import cached_property 8from typing import Optional 9from urllib.error import URLError 10from urllib.request import Request, urlopen 11 12import google.auth 13from google.auth.transport.requests import Request as GoogleAuthRequest 14from google.cloud import bigquery 15from google.oauth2.id_token import fetch_id_token 16 17DRY_RUN_URL = ( 18 "https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun" 19) 20 21 22def urlopen_with_retry(request, max_retries=5, initial_delay=2.0, timeout=30): 23 """ 24 Perform urlopen with exponential backoff retry logic. 25 26 Args: 27 request: The urllib Request object 28 max_retries: Maximum number of retry attempts (default: 5) 29 initial_delay: Initial delay in seconds before first retry (default: 2.0) 30 timeout: Timeout in seconds for each request (default: 30) 31 32 Returns: 33 The response from urlopen 34 35 Raises: 36 URLError: If all retry attempts fail 37 """ 38 last_error = None 39 delay = initial_delay 40 41 for attempt in range(max_retries + 1): 42 try: 43 return urlopen(request, timeout=timeout) 44 except URLError as e: 45 last_error = e 46 if attempt < max_retries: 47 print( 48 f"Network request failed (attempt {attempt + 1}/{max_retries + 1}): {e}" 49 ) 50 print(f"Retrying in {delay} seconds...") 51 time.sleep(delay) 52 delay *= 2 # Exponential backoff 53 else: 54 print(f"All {max_retries + 1} attempts failed") 55 56 raise last_error 57 58 59def credentials(auth_req: Optional[GoogleAuthRequest] = None): 60 """Get GCP credentials.""" 61 auth_req = auth_req or GoogleAuthRequest() 62 creds, _ = google.auth.default( 63 scopes=["https://www.googleapis.com/auth/cloud-platform"] 64 ) 65 creds.refresh(auth_req) 66 return creds 67 68 69def id_token(): 70 """Get token to authenticate against Cloud Function.""" 71 # look for token created by the GitHub Actions workflow 72 id_token = os.environ.get("GOOGLE_GHA_ID_TOKEN") 73 74 if not id_token: 75 auth_req = GoogleAuthRequest() 76 creds = credentials(auth_req) 77 if hasattr(creds, "id_token"): 78 # Get token from default credentials for the current environment created via Cloud SDK run 79 id_token = creds.id_token 80 else: 81 # If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file, 82 # then ID token is acquired using this service account credentials. 83 id_token = fetch_id_token(auth_req, DRY_RUN_URL) 84 return id_token 85 86 87class DryRunError(Exception): 88 """Exception raised on dry run errors.""" 89 90 def __init__(self, message, error, use_cloud_function, table_id): 91 """Initialize DryRunError.""" 92 super().__init__(message) 93 self.error = error 94 self.use_cloud_function = use_cloud_function 95 self.table_id = table_id 96 97 def __reduce__(self): 98 """ 99 Override to ensure that all parameters are being passed when pickling. 100 101 Pickling happens when passing exception between processes (e.g. via multiprocessing) 102 """ 103 return ( 104 self.__class__, 105 self.args + (self.error, self.use_cloud_function, self.table_id), 106 ) 107 108 109class Errors(Enum): 110 """DryRun errors that require special handling.""" 111 112 READ_ONLY = 1 113 DATE_FILTER_NEEDED = 2 114 DATE_FILTER_NEEDED_AND_SYNTAX = 3 115 PERMISSION_DENIED = 4 116 117 118class DryRunContext: 119 """DryRun builder class.""" 120 121 def __init__( 122 self, 123 use_cloud_function=False, 124 id_token=None, 125 credentials=None, 126 dry_run_url=DRY_RUN_URL, 127 ): 128 """Initialize dry run instance.""" 129 self.use_cloud_function = use_cloud_function 130 self.dry_run_url = dry_run_url 131 self.id_token = id_token 132 self.credentials = credentials 133 134 def create( 135 self, 136 sql=None, 137 project="moz-fx-data-shared-prod", 138 dataset=None, 139 table=None, 140 ): 141 """Initialize a DryRun instance.""" 142 return DryRun( 143 use_cloud_function=self.use_cloud_function, 144 id_token=self.id_token, 145 credentials=self.credentials, 146 sql=sql, 147 project=project, 148 dataset=dataset, 149 table=table, 150 dry_run_url=self.dry_run_url, 151 ) 152 153 154class DryRun: 155 """Dry run SQL.""" 156 157 def __init__( 158 self, 159 use_cloud_function=False, 160 id_token=None, 161 credentials=None, 162 sql=None, 163 project="moz-fx-data-shared-prod", 164 dataset=None, 165 table=None, 166 dry_run_url=DRY_RUN_URL, 167 ): 168 """Initialize dry run instance.""" 169 self.sql = sql 170 self.use_cloud_function = use_cloud_function 171 self.project = project 172 self.dataset = dataset 173 self.table = table 174 self.dry_run_url = dry_run_url 175 self.id_token = id_token 176 self.credentials = credentials 177 178 @cached_property 179 def client(self): 180 """Get BigQuery client instance.""" 181 return bigquery.Client(credentials=self.credentials) 182 183 @cached_property 184 def dry_run_result(self): 185 """Return the dry run result.""" 186 try: 187 if self.use_cloud_function: 188 json_data = { 189 "query": self.sql or "SELECT 1", 190 "project": self.project, 191 "dataset": self.dataset or "telemetry", 192 } 193 194 if self.table: 195 json_data["table"] = self.table 196 197 request = Request( 198 self.dry_run_url, 199 headers={ 200 "Content-Type": "application/json", 201 "Authorization": f"Bearer {self.id_token}", 202 }, 203 data=json.dumps(json_data).encode("utf8"), 204 method="POST", 205 ) 206 r = urlopen_with_retry(request) 207 return json.load(r) 208 else: 209 query_schema = None 210 referenced_tables = [] 211 table_metadata = None 212 213 if self.sql: 214 job_config = bigquery.QueryJobConfig( 215 dry_run=True, 216 use_query_cache=False, 217 query_parameters=[ 218 bigquery.ScalarQueryParameter( 219 "submission_date", "DATE", "2019-01-01" 220 ) 221 ], 222 ) 223 224 if self.project: 225 job_config.connection_properties = [ 226 bigquery.ConnectionProperty( 227 "dataset_project_id", self.project 228 ) 229 ] 230 231 job = self.client.query(self.sql, job_config=job_config) 232 query_schema = ( 233 job._properties.get("statistics", {}) 234 .get("query", {}) 235 .get("schema", {}) 236 ) 237 referenced_tables = [ 238 ref.to_api_repr() for ref in job.referenced_tables 239 ] 240 241 if ( 242 self.project is not None 243 and self.table is not None 244 and self.dataset is not None 245 ): 246 table = self.client.get_table( 247 f"{self.project}.{self.dataset}.{self.table}" 248 ) 249 table_metadata = { 250 "tableType": table.table_type, 251 "friendlyName": table.friendly_name, 252 "schema": { 253 "fields": [field.to_api_repr() for field in table.schema] 254 }, 255 } 256 257 return { 258 "valid": True, 259 "referencedTables": referenced_tables, 260 "schema": query_schema, 261 "tableMetadata": table_metadata, 262 } 263 except Exception as e: 264 print(f"ERROR {e}") 265 return None 266 267 def get_schema(self): 268 """Return the query schema by dry running the SQL file.""" 269 self.validate() 270 271 if ( 272 self.dry_run_result 273 and self.dry_run_result["valid"] 274 and "schema" in self.dry_run_result 275 ): 276 return self.dry_run_result["schema"]["fields"] 277 278 return [] 279 280 def get_table_schema(self): 281 """Return the schema of the provided table.""" 282 self.validate() 283 284 if ( 285 self.dry_run_result 286 and self.dry_run_result["valid"] 287 and "tableMetadata" in self.dry_run_result 288 ): 289 return self.dry_run_result["tableMetadata"]["schema"]["fields"] 290 291 return [] 292 293 def get_table_metadata(self): 294 """Return table metadata.""" 295 self.validate() 296 297 if ( 298 self.dry_run_result 299 and self.dry_run_result["valid"] 300 and "tableMetadata" in self.dry_run_result 301 ): 302 return self.dry_run_result["tableMetadata"] 303 304 return {} 305 306 def validate(self): 307 """Dry run the provided SQL file and check if valid.""" 308 dry_run_error = DryRunError( 309 "Error when dry running SQL", 310 self.get_error(), 311 self.use_cloud_function, 312 self.table, 313 ) 314 315 if self.dry_run_result is None: 316 raise dry_run_error 317 318 if self.dry_run_result["valid"]: 319 return True 320 elif self.get_error() == Errors.READ_ONLY: 321 # We want the dryrun service to only have read permissions, so 322 # we expect CREATE VIEW and CREATE TABLE to throw specific 323 # exceptions. 324 return True 325 elif self.get_error() == Errors.DATE_FILTER_NEEDED: 326 # With strip_dml flag, some queries require a partition filter 327 # (submission_date, submission_timestamp, etc.) to run 328 return True 329 else: 330 print("ERROR\n", self.dry_run_result["errors"]) 331 raise dry_run_error 332 333 def errors(self): 334 """Dry run the provided SQL file and return errors.""" 335 if self.dry_run_result is None: 336 return [] 337 return self.dry_run_result.get("errors", []) 338 339 def get_error(self) -> Optional[Errors]: 340 """Get specific errors for edge case handling.""" 341 errors = self.errors() 342 if len(errors) != 1: 343 return None 344 345 error = errors[0] 346 if error and error.get("code") in [400, 403]: 347 error_message = error.get("message", "") 348 if ( 349 "does not have bigquery.tables.create permission for dataset" 350 in error_message 351 or "Permission bigquery.tables.create denied" in error_message 352 or "Permission bigquery.datasets.update denied" in error_message 353 ): 354 return Errors.READ_ONLY 355 if "without a filter over column(s)" in error_message: 356 return Errors.DATE_FILTER_NEEDED 357 if ( 358 "Syntax error: Expected end of input but got keyword WHERE" 359 in error_message 360 ): 361 return Errors.DATE_FILTER_NEEDED_AND_SYNTAX 362 if ( 363 "Permission bigquery.tables.get denied on table" in error_message 364 or "User does not have permission to query table" in error_message 365 ): 366 return Errors.PERMISSION_DENIED 367 return None
23def urlopen_with_retry(request, max_retries=5, initial_delay=2.0, timeout=30): 24 """ 25 Perform urlopen with exponential backoff retry logic. 26 27 Args: 28 request: The urllib Request object 29 max_retries: Maximum number of retry attempts (default: 5) 30 initial_delay: Initial delay in seconds before first retry (default: 2.0) 31 timeout: Timeout in seconds for each request (default: 30) 32 33 Returns: 34 The response from urlopen 35 36 Raises: 37 URLError: If all retry attempts fail 38 """ 39 last_error = None 40 delay = initial_delay 41 42 for attempt in range(max_retries + 1): 43 try: 44 return urlopen(request, timeout=timeout) 45 except URLError as e: 46 last_error = e 47 if attempt < max_retries: 48 print( 49 f"Network request failed (attempt {attempt + 1}/{max_retries + 1}): {e}" 50 ) 51 print(f"Retrying in {delay} seconds...") 52 time.sleep(delay) 53 delay *= 2 # Exponential backoff 54 else: 55 print(f"All {max_retries + 1} attempts failed") 56 57 raise last_error
Perform urlopen with exponential backoff retry logic.
Args: request: The urllib Request object max_retries: Maximum number of retry attempts (default: 5) initial_delay: Initial delay in seconds before first retry (default: 2.0) timeout: Timeout in seconds for each request (default: 30)
Returns: The response from urlopen
Raises: URLError: If all retry attempts fail
60def credentials(auth_req: Optional[GoogleAuthRequest] = None): 61 """Get GCP credentials.""" 62 auth_req = auth_req or GoogleAuthRequest() 63 creds, _ = google.auth.default( 64 scopes=["https://www.googleapis.com/auth/cloud-platform"] 65 ) 66 creds.refresh(auth_req) 67 return creds
Get GCP credentials.
70def id_token(): 71 """Get token to authenticate against Cloud Function.""" 72 # look for token created by the GitHub Actions workflow 73 id_token = os.environ.get("GOOGLE_GHA_ID_TOKEN") 74 75 if not id_token: 76 auth_req = GoogleAuthRequest() 77 creds = credentials(auth_req) 78 if hasattr(creds, "id_token"): 79 # Get token from default credentials for the current environment created via Cloud SDK run 80 id_token = creds.id_token 81 else: 82 # If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file, 83 # then ID token is acquired using this service account credentials. 84 id_token = fetch_id_token(auth_req, DRY_RUN_URL) 85 return id_token
Get token to authenticate against Cloud Function.
88class DryRunError(Exception): 89 """Exception raised on dry run errors.""" 90 91 def __init__(self, message, error, use_cloud_function, table_id): 92 """Initialize DryRunError.""" 93 super().__init__(message) 94 self.error = error 95 self.use_cloud_function = use_cloud_function 96 self.table_id = table_id 97 98 def __reduce__(self): 99 """ 100 Override to ensure that all parameters are being passed when pickling. 101 102 Pickling happens when passing exception between processes (e.g. via multiprocessing) 103 """ 104 return ( 105 self.__class__, 106 self.args + (self.error, self.use_cloud_function, self.table_id), 107 )
Exception raised on dry run errors.
91 def __init__(self, message, error, use_cloud_function, table_id): 92 """Initialize DryRunError.""" 93 super().__init__(message) 94 self.error = error 95 self.use_cloud_function = use_cloud_function 96 self.table_id = table_id
Initialize DryRunError.
110class Errors(Enum): 111 """DryRun errors that require special handling.""" 112 113 READ_ONLY = 1 114 DATE_FILTER_NEEDED = 2 115 DATE_FILTER_NEEDED_AND_SYNTAX = 3 116 PERMISSION_DENIED = 4
DryRun errors that require special handling.
119class DryRunContext: 120 """DryRun builder class.""" 121 122 def __init__( 123 self, 124 use_cloud_function=False, 125 id_token=None, 126 credentials=None, 127 dry_run_url=DRY_RUN_URL, 128 ): 129 """Initialize dry run instance.""" 130 self.use_cloud_function = use_cloud_function 131 self.dry_run_url = dry_run_url 132 self.id_token = id_token 133 self.credentials = credentials 134 135 def create( 136 self, 137 sql=None, 138 project="moz-fx-data-shared-prod", 139 dataset=None, 140 table=None, 141 ): 142 """Initialize a DryRun instance.""" 143 return DryRun( 144 use_cloud_function=self.use_cloud_function, 145 id_token=self.id_token, 146 credentials=self.credentials, 147 sql=sql, 148 project=project, 149 dataset=dataset, 150 table=table, 151 dry_run_url=self.dry_run_url, 152 )
DryRun builder class.
122 def __init__( 123 self, 124 use_cloud_function=False, 125 id_token=None, 126 credentials=None, 127 dry_run_url=DRY_RUN_URL, 128 ): 129 """Initialize dry run instance.""" 130 self.use_cloud_function = use_cloud_function 131 self.dry_run_url = dry_run_url 132 self.id_token = id_token 133 self.credentials = credentials
Initialize dry run instance.
135 def create( 136 self, 137 sql=None, 138 project="moz-fx-data-shared-prod", 139 dataset=None, 140 table=None, 141 ): 142 """Initialize a DryRun instance.""" 143 return DryRun( 144 use_cloud_function=self.use_cloud_function, 145 id_token=self.id_token, 146 credentials=self.credentials, 147 sql=sql, 148 project=project, 149 dataset=dataset, 150 table=table, 151 dry_run_url=self.dry_run_url, 152 )
Initialize a DryRun instance.
155class DryRun: 156 """Dry run SQL.""" 157 158 def __init__( 159 self, 160 use_cloud_function=False, 161 id_token=None, 162 credentials=None, 163 sql=None, 164 project="moz-fx-data-shared-prod", 165 dataset=None, 166 table=None, 167 dry_run_url=DRY_RUN_URL, 168 ): 169 """Initialize dry run instance.""" 170 self.sql = sql 171 self.use_cloud_function = use_cloud_function 172 self.project = project 173 self.dataset = dataset 174 self.table = table 175 self.dry_run_url = dry_run_url 176 self.id_token = id_token 177 self.credentials = credentials 178 179 @cached_property 180 def client(self): 181 """Get BigQuery client instance.""" 182 return bigquery.Client(credentials=self.credentials) 183 184 @cached_property 185 def dry_run_result(self): 186 """Return the dry run result.""" 187 try: 188 if self.use_cloud_function: 189 json_data = { 190 "query": self.sql or "SELECT 1", 191 "project": self.project, 192 "dataset": self.dataset or "telemetry", 193 } 194 195 if self.table: 196 json_data["table"] = self.table 197 198 request = Request( 199 self.dry_run_url, 200 headers={ 201 "Content-Type": "application/json", 202 "Authorization": f"Bearer {self.id_token}", 203 }, 204 data=json.dumps(json_data).encode("utf8"), 205 method="POST", 206 ) 207 r = urlopen_with_retry(request) 208 return json.load(r) 209 else: 210 query_schema = None 211 referenced_tables = [] 212 table_metadata = None 213 214 if self.sql: 215 job_config = bigquery.QueryJobConfig( 216 dry_run=True, 217 use_query_cache=False, 218 query_parameters=[ 219 bigquery.ScalarQueryParameter( 220 "submission_date", "DATE", "2019-01-01" 221 ) 222 ], 223 ) 224 225 if self.project: 226 job_config.connection_properties = [ 227 bigquery.ConnectionProperty( 228 "dataset_project_id", self.project 229 ) 230 ] 231 232 job = self.client.query(self.sql, job_config=job_config) 233 query_schema = ( 234 job._properties.get("statistics", {}) 235 .get("query", {}) 236 .get("schema", {}) 237 ) 238 referenced_tables = [ 239 ref.to_api_repr() for ref in job.referenced_tables 240 ] 241 242 if ( 243 self.project is not None 244 and self.table is not None 245 and self.dataset is not None 246 ): 247 table = self.client.get_table( 248 f"{self.project}.{self.dataset}.{self.table}" 249 ) 250 table_metadata = { 251 "tableType": table.table_type, 252 "friendlyName": table.friendly_name, 253 "schema": { 254 "fields": [field.to_api_repr() for field in table.schema] 255 }, 256 } 257 258 return { 259 "valid": True, 260 "referencedTables": referenced_tables, 261 "schema": query_schema, 262 "tableMetadata": table_metadata, 263 } 264 except Exception as e: 265 print(f"ERROR {e}") 266 return None 267 268 def get_schema(self): 269 """Return the query schema by dry running the SQL file.""" 270 self.validate() 271 272 if ( 273 self.dry_run_result 274 and self.dry_run_result["valid"] 275 and "schema" in self.dry_run_result 276 ): 277 return self.dry_run_result["schema"]["fields"] 278 279 return [] 280 281 def get_table_schema(self): 282 """Return the schema of the provided table.""" 283 self.validate() 284 285 if ( 286 self.dry_run_result 287 and self.dry_run_result["valid"] 288 and "tableMetadata" in self.dry_run_result 289 ): 290 return self.dry_run_result["tableMetadata"]["schema"]["fields"] 291 292 return [] 293 294 def get_table_metadata(self): 295 """Return table metadata.""" 296 self.validate() 297 298 if ( 299 self.dry_run_result 300 and self.dry_run_result["valid"] 301 and "tableMetadata" in self.dry_run_result 302 ): 303 return self.dry_run_result["tableMetadata"] 304 305 return {} 306 307 def validate(self): 308 """Dry run the provided SQL file and check if valid.""" 309 dry_run_error = DryRunError( 310 "Error when dry running SQL", 311 self.get_error(), 312 self.use_cloud_function, 313 self.table, 314 ) 315 316 if self.dry_run_result is None: 317 raise dry_run_error 318 319 if self.dry_run_result["valid"]: 320 return True 321 elif self.get_error() == Errors.READ_ONLY: 322 # We want the dryrun service to only have read permissions, so 323 # we expect CREATE VIEW and CREATE TABLE to throw specific 324 # exceptions. 325 return True 326 elif self.get_error() == Errors.DATE_FILTER_NEEDED: 327 # With strip_dml flag, some queries require a partition filter 328 # (submission_date, submission_timestamp, etc.) to run 329 return True 330 else: 331 print("ERROR\n", self.dry_run_result["errors"]) 332 raise dry_run_error 333 334 def errors(self): 335 """Dry run the provided SQL file and return errors.""" 336 if self.dry_run_result is None: 337 return [] 338 return self.dry_run_result.get("errors", []) 339 340 def get_error(self) -> Optional[Errors]: 341 """Get specific errors for edge case handling.""" 342 errors = self.errors() 343 if len(errors) != 1: 344 return None 345 346 error = errors[0] 347 if error and error.get("code") in [400, 403]: 348 error_message = error.get("message", "") 349 if ( 350 "does not have bigquery.tables.create permission for dataset" 351 in error_message 352 or "Permission bigquery.tables.create denied" in error_message 353 or "Permission bigquery.datasets.update denied" in error_message 354 ): 355 return Errors.READ_ONLY 356 if "without a filter over column(s)" in error_message: 357 return Errors.DATE_FILTER_NEEDED 358 if ( 359 "Syntax error: Expected end of input but got keyword WHERE" 360 in error_message 361 ): 362 return Errors.DATE_FILTER_NEEDED_AND_SYNTAX 363 if ( 364 "Permission bigquery.tables.get denied on table" in error_message 365 or "User does not have permission to query table" in error_message 366 ): 367 return Errors.PERMISSION_DENIED 368 return None
Dry run SQL.
158 def __init__( 159 self, 160 use_cloud_function=False, 161 id_token=None, 162 credentials=None, 163 sql=None, 164 project="moz-fx-data-shared-prod", 165 dataset=None, 166 table=None, 167 dry_run_url=DRY_RUN_URL, 168 ): 169 """Initialize dry run instance.""" 170 self.sql = sql 171 self.use_cloud_function = use_cloud_function 172 self.project = project 173 self.dataset = dataset 174 self.table = table 175 self.dry_run_url = dry_run_url 176 self.id_token = id_token 177 self.credentials = credentials
Initialize dry run instance.
179 @cached_property 180 def client(self): 181 """Get BigQuery client instance.""" 182 return bigquery.Client(credentials=self.credentials)
Get BigQuery client instance.
184 @cached_property 185 def dry_run_result(self): 186 """Return the dry run result.""" 187 try: 188 if self.use_cloud_function: 189 json_data = { 190 "query": self.sql or "SELECT 1", 191 "project": self.project, 192 "dataset": self.dataset or "telemetry", 193 } 194 195 if self.table: 196 json_data["table"] = self.table 197 198 request = Request( 199 self.dry_run_url, 200 headers={ 201 "Content-Type": "application/json", 202 "Authorization": f"Bearer {self.id_token}", 203 }, 204 data=json.dumps(json_data).encode("utf8"), 205 method="POST", 206 ) 207 r = urlopen_with_retry(request) 208 return json.load(r) 209 else: 210 query_schema = None 211 referenced_tables = [] 212 table_metadata = None 213 214 if self.sql: 215 job_config = bigquery.QueryJobConfig( 216 dry_run=True, 217 use_query_cache=False, 218 query_parameters=[ 219 bigquery.ScalarQueryParameter( 220 "submission_date", "DATE", "2019-01-01" 221 ) 222 ], 223 ) 224 225 if self.project: 226 job_config.connection_properties = [ 227 bigquery.ConnectionProperty( 228 "dataset_project_id", self.project 229 ) 230 ] 231 232 job = self.client.query(self.sql, job_config=job_config) 233 query_schema = ( 234 job._properties.get("statistics", {}) 235 .get("query", {}) 236 .get("schema", {}) 237 ) 238 referenced_tables = [ 239 ref.to_api_repr() for ref in job.referenced_tables 240 ] 241 242 if ( 243 self.project is not None 244 and self.table is not None 245 and self.dataset is not None 246 ): 247 table = self.client.get_table( 248 f"{self.project}.{self.dataset}.{self.table}" 249 ) 250 table_metadata = { 251 "tableType": table.table_type, 252 "friendlyName": table.friendly_name, 253 "schema": { 254 "fields": [field.to_api_repr() for field in table.schema] 255 }, 256 } 257 258 return { 259 "valid": True, 260 "referencedTables": referenced_tables, 261 "schema": query_schema, 262 "tableMetadata": table_metadata, 263 } 264 except Exception as e: 265 print(f"ERROR {e}") 266 return None
Return the dry run result.
268 def get_schema(self): 269 """Return the query schema by dry running the SQL file.""" 270 self.validate() 271 272 if ( 273 self.dry_run_result 274 and self.dry_run_result["valid"] 275 and "schema" in self.dry_run_result 276 ): 277 return self.dry_run_result["schema"]["fields"] 278 279 return []
Return the query schema by dry running the SQL file.
281 def get_table_schema(self): 282 """Return the schema of the provided table.""" 283 self.validate() 284 285 if ( 286 self.dry_run_result 287 and self.dry_run_result["valid"] 288 and "tableMetadata" in self.dry_run_result 289 ): 290 return self.dry_run_result["tableMetadata"]["schema"]["fields"] 291 292 return []
Return the schema of the provided table.
294 def get_table_metadata(self): 295 """Return table metadata.""" 296 self.validate() 297 298 if ( 299 self.dry_run_result 300 and self.dry_run_result["valid"] 301 and "tableMetadata" in self.dry_run_result 302 ): 303 return self.dry_run_result["tableMetadata"] 304 305 return {}
Return table metadata.
307 def validate(self): 308 """Dry run the provided SQL file and check if valid.""" 309 dry_run_error = DryRunError( 310 "Error when dry running SQL", 311 self.get_error(), 312 self.use_cloud_function, 313 self.table, 314 ) 315 316 if self.dry_run_result is None: 317 raise dry_run_error 318 319 if self.dry_run_result["valid"]: 320 return True 321 elif self.get_error() == Errors.READ_ONLY: 322 # We want the dryrun service to only have read permissions, so 323 # we expect CREATE VIEW and CREATE TABLE to throw specific 324 # exceptions. 325 return True 326 elif self.get_error() == Errors.DATE_FILTER_NEEDED: 327 # With strip_dml flag, some queries require a partition filter 328 # (submission_date, submission_timestamp, etc.) to run 329 return True 330 else: 331 print("ERROR\n", self.dry_run_result["errors"]) 332 raise dry_run_error
Dry run the provided SQL file and check if valid.
334 def errors(self): 335 """Dry run the provided SQL file and return errors.""" 336 if self.dry_run_result is None: 337 return [] 338 return self.dry_run_result.get("errors", [])
Dry run the provided SQL file and return errors.
340 def get_error(self) -> Optional[Errors]: 341 """Get specific errors for edge case handling.""" 342 errors = self.errors() 343 if len(errors) != 1: 344 return None 345 346 error = errors[0] 347 if error and error.get("code") in [400, 403]: 348 error_message = error.get("message", "") 349 if ( 350 "does not have bigquery.tables.create permission for dataset" 351 in error_message 352 or "Permission bigquery.tables.create denied" in error_message 353 or "Permission bigquery.datasets.update denied" in error_message 354 ): 355 return Errors.READ_ONLY 356 if "without a filter over column(s)" in error_message: 357 return Errors.DATE_FILTER_NEEDED 358 if ( 359 "Syntax error: Expected end of input but got keyword WHERE" 360 in error_message 361 ): 362 return Errors.DATE_FILTER_NEEDED_AND_SYNTAX 363 if ( 364 "Permission bigquery.tables.get denied on table" in error_message 365 or "User does not have permission to query table" in error_message 366 ): 367 return Errors.PERMISSION_DENIED 368 return None
Get specific errors for edge case handling.