generator.dryrun
Dry Run method to get BigQuery metadata.
1"""Dry Run method to get BigQuery metadata.""" 2 3import json 4import os 5import time 6from enum import Enum 7from functools import cached_property 8from typing import Optional 9from urllib.error import URLError 10from urllib.request import Request, urlopen 11 12import google.auth 13from google.auth.transport.requests import Request as GoogleAuthRequest 14from google.cloud import bigquery 15from google.oauth2.id_token import fetch_id_token 16 17DRY_RUN_URL = "https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/dryrun" 18 19 20def urlopen_with_retry(request, max_retries=5, initial_delay=2.0, timeout=30): 21 """ 22 Perform urlopen with exponential backoff retry logic. 23 24 Args: 25 request: The urllib Request object 26 max_retries: Maximum number of retry attempts (default: 5) 27 initial_delay: Initial delay in seconds before first retry (default: 2.0) 28 timeout: Timeout in seconds for each request (default: 30) 29 30 Returns: 31 The response from urlopen 32 33 Raises: 34 URLError: If all retry attempts fail 35 """ 36 last_error = None 37 delay = initial_delay 38 39 for attempt in range(max_retries + 1): 40 try: 41 return urlopen(request, timeout=timeout) 42 except URLError as e: 43 last_error = e 44 if attempt < max_retries: 45 print( 46 f"Network request failed (attempt {attempt + 1}/{max_retries + 1}): {e}" 47 ) 48 print(f"Retrying in {delay} seconds...") 49 time.sleep(delay) 50 delay *= 2 # Exponential backoff 51 else: 52 print(f"All {max_retries + 1} attempts failed") 53 54 raise last_error 55 56 57def credentials(auth_req: Optional[GoogleAuthRequest] = None): 58 """Get GCP credentials.""" 59 auth_req = auth_req or GoogleAuthRequest() 60 creds, _ = google.auth.default( 61 scopes=["https://www.googleapis.com/auth/cloud-platform"] 62 ) 63 creds.refresh(auth_req) 64 return creds 65 66 67def id_token(): 68 """Get token to authenticate against Cloud Function.""" 69 # look for token created by the GitHub Actions workflow 70 id_token = os.environ.get("GOOGLE_GHA_ID_TOKEN") 71 72 if not id_token: 73 auth_req = GoogleAuthRequest() 74 creds = credentials(auth_req) 75 if hasattr(creds, "id_token"): 76 # Get token from default credentials for the current environment created via Cloud SDK run 77 id_token = creds.id_token 78 else: 79 # If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file, 80 # then ID token is acquired using this service account credentials. 81 id_token = fetch_id_token(auth_req, DRY_RUN_URL) 82 return id_token 83 84 85class DryRunError(Exception): 86 """Exception raised on dry run errors.""" 87 88 def __init__(self, message, error, use_cloud_function, table_id): 89 """Initialize DryRunError.""" 90 super().__init__(message) 91 self.error = error 92 self.use_cloud_function = use_cloud_function 93 self.table_id = table_id 94 95 def __reduce__(self): 96 """ 97 Override to ensure that all parameters are being passed when pickling. 98 99 Pickling happens when passing exception between processes (e.g. via multiprocessing) 100 """ 101 return ( 102 self.__class__, 103 self.args + (self.error, self.use_cloud_function, self.table_id), 104 ) 105 106 107class Errors(Enum): 108 """DryRun errors that require special handling.""" 109 110 READ_ONLY = 1 111 DATE_FILTER_NEEDED = 2 112 DATE_FILTER_NEEDED_AND_SYNTAX = 3 113 PERMISSION_DENIED = 4 114 115 116class DryRunContext: 117 """DryRun builder class.""" 118 119 def __init__( 120 self, 121 use_cloud_function=False, 122 id_token=None, 123 credentials=None, 124 dry_run_url=DRY_RUN_URL, 125 ): 126 """Initialize dry run instance.""" 127 self.use_cloud_function = use_cloud_function 128 self.dry_run_url = dry_run_url 129 self.id_token = id_token 130 self.credentials = credentials 131 132 def create( 133 self, 134 sql=None, 135 project="moz-fx-data-shared-prod", 136 dataset=None, 137 table=None, 138 ): 139 """Initialize a DryRun instance.""" 140 return DryRun( 141 use_cloud_function=self.use_cloud_function, 142 id_token=self.id_token, 143 credentials=self.credentials, 144 sql=sql, 145 project=project, 146 dataset=dataset, 147 table=table, 148 dry_run_url=self.dry_run_url, 149 ) 150 151 152class DryRun: 153 """Dry run SQL.""" 154 155 def __init__( 156 self, 157 use_cloud_function=False, 158 id_token=None, 159 credentials=None, 160 sql=None, 161 project="moz-fx-data-shared-prod", 162 dataset=None, 163 table=None, 164 dry_run_url=DRY_RUN_URL, 165 ): 166 """Initialize dry run instance.""" 167 self.sql = sql 168 self.use_cloud_function = use_cloud_function 169 self.project = project 170 self.dataset = dataset 171 self.table = table 172 self.dry_run_url = dry_run_url 173 self.id_token = id_token 174 self.credentials = credentials 175 176 @cached_property 177 def client(self): 178 """Get BigQuery client instance.""" 179 return bigquery.Client(credentials=self.credentials) 180 181 @cached_property 182 def dry_run_result(self): 183 """Return the dry run result.""" 184 try: 185 if self.use_cloud_function: 186 json_data = { 187 "query": self.sql or "SELECT 1", 188 "project": self.project, 189 "dataset": self.dataset or "telemetry", 190 } 191 192 if self.table: 193 json_data["table"] = self.table 194 195 request = Request( 196 self.dry_run_url, 197 headers={ 198 "Content-Type": "application/json", 199 "Authorization": f"Bearer {self.id_token}", 200 }, 201 data=json.dumps(json_data).encode("utf8"), 202 method="POST", 203 ) 204 r = urlopen_with_retry(request) 205 return json.load(r) 206 else: 207 query_schema = None 208 referenced_tables = [] 209 table_metadata = None 210 211 if self.sql: 212 job_config = bigquery.QueryJobConfig( 213 dry_run=True, 214 use_query_cache=False, 215 query_parameters=[ 216 bigquery.ScalarQueryParameter( 217 "submission_date", "DATE", "2019-01-01" 218 ) 219 ], 220 ) 221 222 if self.project: 223 job_config.connection_properties = [ 224 bigquery.ConnectionProperty( 225 "dataset_project_id", self.project 226 ) 227 ] 228 229 job = self.client.query(self.sql, job_config=job_config) 230 query_schema = ( 231 job._properties.get("statistics", {}) 232 .get("query", {}) 233 .get("schema", {}) 234 ) 235 referenced_tables = [ 236 ref.to_api_repr() for ref in job.referenced_tables 237 ] 238 239 if ( 240 self.project is not None 241 and self.table is not None 242 and self.dataset is not None 243 ): 244 table = self.client.get_table( 245 f"{self.project}.{self.dataset}.{self.table}" 246 ) 247 table_metadata = { 248 "tableType": table.table_type, 249 "friendlyName": table.friendly_name, 250 "schema": { 251 "fields": [field.to_api_repr() for field in table.schema] 252 }, 253 } 254 255 return { 256 "valid": True, 257 "referencedTables": referenced_tables, 258 "schema": query_schema, 259 "tableMetadata": table_metadata, 260 } 261 except Exception as e: 262 print(f"ERROR {e}") 263 return None 264 265 def get_schema(self): 266 """Return the query schema by dry running the SQL file.""" 267 self.validate() 268 269 if ( 270 self.dry_run_result 271 and self.dry_run_result["valid"] 272 and "schema" in self.dry_run_result 273 ): 274 return self.dry_run_result["schema"]["fields"] 275 276 return [] 277 278 def get_table_schema(self): 279 """Return the schema of the provided table.""" 280 self.validate() 281 282 if ( 283 self.dry_run_result 284 and self.dry_run_result["valid"] 285 and "tableMetadata" in self.dry_run_result 286 ): 287 return self.dry_run_result["tableMetadata"]["schema"]["fields"] 288 289 return [] 290 291 def get_table_metadata(self): 292 """Return table metadata.""" 293 self.validate() 294 295 if ( 296 self.dry_run_result 297 and self.dry_run_result["valid"] 298 and "tableMetadata" in self.dry_run_result 299 ): 300 return self.dry_run_result["tableMetadata"] 301 302 return {} 303 304 def validate(self): 305 """Dry run the provided SQL file and check if valid.""" 306 dry_run_error = DryRunError( 307 "Error when dry running SQL", 308 self.get_error(), 309 self.use_cloud_function, 310 self.table, 311 ) 312 313 if self.dry_run_result is None: 314 raise dry_run_error 315 316 if self.dry_run_result["valid"]: 317 return True 318 elif self.get_error() == Errors.READ_ONLY: 319 # We want the dryrun service to only have read permissions, so 320 # we expect CREATE VIEW and CREATE TABLE to throw specific 321 # exceptions. 322 return True 323 elif self.get_error() == Errors.DATE_FILTER_NEEDED: 324 # With strip_dml flag, some queries require a partition filter 325 # (submission_date, submission_timestamp, etc.) to run 326 return True 327 else: 328 print("ERROR\n", self.dry_run_result["errors"]) 329 raise dry_run_error 330 331 def errors(self): 332 """Dry run the provided SQL file and return errors.""" 333 if self.dry_run_result is None: 334 return [] 335 return self.dry_run_result.get("errors", []) 336 337 def get_error(self) -> Optional[Errors]: 338 """Get specific errors for edge case handling.""" 339 errors = self.errors() 340 if len(errors) != 1: 341 return None 342 343 error = errors[0] 344 if error and error.get("code") in [400, 403]: 345 error_message = error.get("message", "") 346 if ( 347 "does not have bigquery.tables.create permission for dataset" 348 in error_message 349 or "Permission bigquery.tables.create denied" in error_message 350 or "Permission bigquery.datasets.update denied" in error_message 351 ): 352 return Errors.READ_ONLY 353 if "without a filter over column(s)" in error_message: 354 return Errors.DATE_FILTER_NEEDED 355 if ( 356 "Syntax error: Expected end of input but got keyword WHERE" 357 in error_message 358 ): 359 return Errors.DATE_FILTER_NEEDED_AND_SYNTAX 360 if ( 361 "Permission bigquery.tables.get denied on table" in error_message 362 or "User does not have permission to query table" in error_message 363 ): 364 return Errors.PERMISSION_DENIED 365 return None
21def urlopen_with_retry(request, max_retries=5, initial_delay=2.0, timeout=30): 22 """ 23 Perform urlopen with exponential backoff retry logic. 24 25 Args: 26 request: The urllib Request object 27 max_retries: Maximum number of retry attempts (default: 5) 28 initial_delay: Initial delay in seconds before first retry (default: 2.0) 29 timeout: Timeout in seconds for each request (default: 30) 30 31 Returns: 32 The response from urlopen 33 34 Raises: 35 URLError: If all retry attempts fail 36 """ 37 last_error = None 38 delay = initial_delay 39 40 for attempt in range(max_retries + 1): 41 try: 42 return urlopen(request, timeout=timeout) 43 except URLError as e: 44 last_error = e 45 if attempt < max_retries: 46 print( 47 f"Network request failed (attempt {attempt + 1}/{max_retries + 1}): {e}" 48 ) 49 print(f"Retrying in {delay} seconds...") 50 time.sleep(delay) 51 delay *= 2 # Exponential backoff 52 else: 53 print(f"All {max_retries + 1} attempts failed") 54 55 raise last_error
Perform urlopen with exponential backoff retry logic.
Args: request: The urllib Request object max_retries: Maximum number of retry attempts (default: 5) initial_delay: Initial delay in seconds before first retry (default: 2.0) timeout: Timeout in seconds for each request (default: 30)
Returns: The response from urlopen
Raises: URLError: If all retry attempts fail
58def credentials(auth_req: Optional[GoogleAuthRequest] = None): 59 """Get GCP credentials.""" 60 auth_req = auth_req or GoogleAuthRequest() 61 creds, _ = google.auth.default( 62 scopes=["https://www.googleapis.com/auth/cloud-platform"] 63 ) 64 creds.refresh(auth_req) 65 return creds
Get GCP credentials.
68def id_token(): 69 """Get token to authenticate against Cloud Function.""" 70 # look for token created by the GitHub Actions workflow 71 id_token = os.environ.get("GOOGLE_GHA_ID_TOKEN") 72 73 if not id_token: 74 auth_req = GoogleAuthRequest() 75 creds = credentials(auth_req) 76 if hasattr(creds, "id_token"): 77 # Get token from default credentials for the current environment created via Cloud SDK run 78 id_token = creds.id_token 79 else: 80 # If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file, 81 # then ID token is acquired using this service account credentials. 82 id_token = fetch_id_token(auth_req, DRY_RUN_URL) 83 return id_token
Get token to authenticate against Cloud Function.
86class DryRunError(Exception): 87 """Exception raised on dry run errors.""" 88 89 def __init__(self, message, error, use_cloud_function, table_id): 90 """Initialize DryRunError.""" 91 super().__init__(message) 92 self.error = error 93 self.use_cloud_function = use_cloud_function 94 self.table_id = table_id 95 96 def __reduce__(self): 97 """ 98 Override to ensure that all parameters are being passed when pickling. 99 100 Pickling happens when passing exception between processes (e.g. via multiprocessing) 101 """ 102 return ( 103 self.__class__, 104 self.args + (self.error, self.use_cloud_function, self.table_id), 105 )
Exception raised on dry run errors.
89 def __init__(self, message, error, use_cloud_function, table_id): 90 """Initialize DryRunError.""" 91 super().__init__(message) 92 self.error = error 93 self.use_cloud_function = use_cloud_function 94 self.table_id = table_id
Initialize DryRunError.
108class Errors(Enum): 109 """DryRun errors that require special handling.""" 110 111 READ_ONLY = 1 112 DATE_FILTER_NEEDED = 2 113 DATE_FILTER_NEEDED_AND_SYNTAX = 3 114 PERMISSION_DENIED = 4
DryRun errors that require special handling.
117class DryRunContext: 118 """DryRun builder class.""" 119 120 def __init__( 121 self, 122 use_cloud_function=False, 123 id_token=None, 124 credentials=None, 125 dry_run_url=DRY_RUN_URL, 126 ): 127 """Initialize dry run instance.""" 128 self.use_cloud_function = use_cloud_function 129 self.dry_run_url = dry_run_url 130 self.id_token = id_token 131 self.credentials = credentials 132 133 def create( 134 self, 135 sql=None, 136 project="moz-fx-data-shared-prod", 137 dataset=None, 138 table=None, 139 ): 140 """Initialize a DryRun instance.""" 141 return DryRun( 142 use_cloud_function=self.use_cloud_function, 143 id_token=self.id_token, 144 credentials=self.credentials, 145 sql=sql, 146 project=project, 147 dataset=dataset, 148 table=table, 149 dry_run_url=self.dry_run_url, 150 )
DryRun builder class.
120 def __init__( 121 self, 122 use_cloud_function=False, 123 id_token=None, 124 credentials=None, 125 dry_run_url=DRY_RUN_URL, 126 ): 127 """Initialize dry run instance.""" 128 self.use_cloud_function = use_cloud_function 129 self.dry_run_url = dry_run_url 130 self.id_token = id_token 131 self.credentials = credentials
Initialize dry run instance.
133 def create( 134 self, 135 sql=None, 136 project="moz-fx-data-shared-prod", 137 dataset=None, 138 table=None, 139 ): 140 """Initialize a DryRun instance.""" 141 return DryRun( 142 use_cloud_function=self.use_cloud_function, 143 id_token=self.id_token, 144 credentials=self.credentials, 145 sql=sql, 146 project=project, 147 dataset=dataset, 148 table=table, 149 dry_run_url=self.dry_run_url, 150 )
Initialize a DryRun instance.
153class DryRun: 154 """Dry run SQL.""" 155 156 def __init__( 157 self, 158 use_cloud_function=False, 159 id_token=None, 160 credentials=None, 161 sql=None, 162 project="moz-fx-data-shared-prod", 163 dataset=None, 164 table=None, 165 dry_run_url=DRY_RUN_URL, 166 ): 167 """Initialize dry run instance.""" 168 self.sql = sql 169 self.use_cloud_function = use_cloud_function 170 self.project = project 171 self.dataset = dataset 172 self.table = table 173 self.dry_run_url = dry_run_url 174 self.id_token = id_token 175 self.credentials = credentials 176 177 @cached_property 178 def client(self): 179 """Get BigQuery client instance.""" 180 return bigquery.Client(credentials=self.credentials) 181 182 @cached_property 183 def dry_run_result(self): 184 """Return the dry run result.""" 185 try: 186 if self.use_cloud_function: 187 json_data = { 188 "query": self.sql or "SELECT 1", 189 "project": self.project, 190 "dataset": self.dataset or "telemetry", 191 } 192 193 if self.table: 194 json_data["table"] = self.table 195 196 request = Request( 197 self.dry_run_url, 198 headers={ 199 "Content-Type": "application/json", 200 "Authorization": f"Bearer {self.id_token}", 201 }, 202 data=json.dumps(json_data).encode("utf8"), 203 method="POST", 204 ) 205 r = urlopen_with_retry(request) 206 return json.load(r) 207 else: 208 query_schema = None 209 referenced_tables = [] 210 table_metadata = None 211 212 if self.sql: 213 job_config = bigquery.QueryJobConfig( 214 dry_run=True, 215 use_query_cache=False, 216 query_parameters=[ 217 bigquery.ScalarQueryParameter( 218 "submission_date", "DATE", "2019-01-01" 219 ) 220 ], 221 ) 222 223 if self.project: 224 job_config.connection_properties = [ 225 bigquery.ConnectionProperty( 226 "dataset_project_id", self.project 227 ) 228 ] 229 230 job = self.client.query(self.sql, job_config=job_config) 231 query_schema = ( 232 job._properties.get("statistics", {}) 233 .get("query", {}) 234 .get("schema", {}) 235 ) 236 referenced_tables = [ 237 ref.to_api_repr() for ref in job.referenced_tables 238 ] 239 240 if ( 241 self.project is not None 242 and self.table is not None 243 and self.dataset is not None 244 ): 245 table = self.client.get_table( 246 f"{self.project}.{self.dataset}.{self.table}" 247 ) 248 table_metadata = { 249 "tableType": table.table_type, 250 "friendlyName": table.friendly_name, 251 "schema": { 252 "fields": [field.to_api_repr() for field in table.schema] 253 }, 254 } 255 256 return { 257 "valid": True, 258 "referencedTables": referenced_tables, 259 "schema": query_schema, 260 "tableMetadata": table_metadata, 261 } 262 except Exception as e: 263 print(f"ERROR {e}") 264 return None 265 266 def get_schema(self): 267 """Return the query schema by dry running the SQL file.""" 268 self.validate() 269 270 if ( 271 self.dry_run_result 272 and self.dry_run_result["valid"] 273 and "schema" in self.dry_run_result 274 ): 275 return self.dry_run_result["schema"]["fields"] 276 277 return [] 278 279 def get_table_schema(self): 280 """Return the schema of the provided table.""" 281 self.validate() 282 283 if ( 284 self.dry_run_result 285 and self.dry_run_result["valid"] 286 and "tableMetadata" in self.dry_run_result 287 ): 288 return self.dry_run_result["tableMetadata"]["schema"]["fields"] 289 290 return [] 291 292 def get_table_metadata(self): 293 """Return table metadata.""" 294 self.validate() 295 296 if ( 297 self.dry_run_result 298 and self.dry_run_result["valid"] 299 and "tableMetadata" in self.dry_run_result 300 ): 301 return self.dry_run_result["tableMetadata"] 302 303 return {} 304 305 def validate(self): 306 """Dry run the provided SQL file and check if valid.""" 307 dry_run_error = DryRunError( 308 "Error when dry running SQL", 309 self.get_error(), 310 self.use_cloud_function, 311 self.table, 312 ) 313 314 if self.dry_run_result is None: 315 raise dry_run_error 316 317 if self.dry_run_result["valid"]: 318 return True 319 elif self.get_error() == Errors.READ_ONLY: 320 # We want the dryrun service to only have read permissions, so 321 # we expect CREATE VIEW and CREATE TABLE to throw specific 322 # exceptions. 323 return True 324 elif self.get_error() == Errors.DATE_FILTER_NEEDED: 325 # With strip_dml flag, some queries require a partition filter 326 # (submission_date, submission_timestamp, etc.) to run 327 return True 328 else: 329 print("ERROR\n", self.dry_run_result["errors"]) 330 raise dry_run_error 331 332 def errors(self): 333 """Dry run the provided SQL file and return errors.""" 334 if self.dry_run_result is None: 335 return [] 336 return self.dry_run_result.get("errors", []) 337 338 def get_error(self) -> Optional[Errors]: 339 """Get specific errors for edge case handling.""" 340 errors = self.errors() 341 if len(errors) != 1: 342 return None 343 344 error = errors[0] 345 if error and error.get("code") in [400, 403]: 346 error_message = error.get("message", "") 347 if ( 348 "does not have bigquery.tables.create permission for dataset" 349 in error_message 350 or "Permission bigquery.tables.create denied" in error_message 351 or "Permission bigquery.datasets.update denied" in error_message 352 ): 353 return Errors.READ_ONLY 354 if "without a filter over column(s)" in error_message: 355 return Errors.DATE_FILTER_NEEDED 356 if ( 357 "Syntax error: Expected end of input but got keyword WHERE" 358 in error_message 359 ): 360 return Errors.DATE_FILTER_NEEDED_AND_SYNTAX 361 if ( 362 "Permission bigquery.tables.get denied on table" in error_message 363 or "User does not have permission to query table" in error_message 364 ): 365 return Errors.PERMISSION_DENIED 366 return None
Dry run SQL.
156 def __init__( 157 self, 158 use_cloud_function=False, 159 id_token=None, 160 credentials=None, 161 sql=None, 162 project="moz-fx-data-shared-prod", 163 dataset=None, 164 table=None, 165 dry_run_url=DRY_RUN_URL, 166 ): 167 """Initialize dry run instance.""" 168 self.sql = sql 169 self.use_cloud_function = use_cloud_function 170 self.project = project 171 self.dataset = dataset 172 self.table = table 173 self.dry_run_url = dry_run_url 174 self.id_token = id_token 175 self.credentials = credentials
Initialize dry run instance.
177 @cached_property 178 def client(self): 179 """Get BigQuery client instance.""" 180 return bigquery.Client(credentials=self.credentials)
Get BigQuery client instance.
182 @cached_property 183 def dry_run_result(self): 184 """Return the dry run result.""" 185 try: 186 if self.use_cloud_function: 187 json_data = { 188 "query": self.sql or "SELECT 1", 189 "project": self.project, 190 "dataset": self.dataset or "telemetry", 191 } 192 193 if self.table: 194 json_data["table"] = self.table 195 196 request = Request( 197 self.dry_run_url, 198 headers={ 199 "Content-Type": "application/json", 200 "Authorization": f"Bearer {self.id_token}", 201 }, 202 data=json.dumps(json_data).encode("utf8"), 203 method="POST", 204 ) 205 r = urlopen_with_retry(request) 206 return json.load(r) 207 else: 208 query_schema = None 209 referenced_tables = [] 210 table_metadata = None 211 212 if self.sql: 213 job_config = bigquery.QueryJobConfig( 214 dry_run=True, 215 use_query_cache=False, 216 query_parameters=[ 217 bigquery.ScalarQueryParameter( 218 "submission_date", "DATE", "2019-01-01" 219 ) 220 ], 221 ) 222 223 if self.project: 224 job_config.connection_properties = [ 225 bigquery.ConnectionProperty( 226 "dataset_project_id", self.project 227 ) 228 ] 229 230 job = self.client.query(self.sql, job_config=job_config) 231 query_schema = ( 232 job._properties.get("statistics", {}) 233 .get("query", {}) 234 .get("schema", {}) 235 ) 236 referenced_tables = [ 237 ref.to_api_repr() for ref in job.referenced_tables 238 ] 239 240 if ( 241 self.project is not None 242 and self.table is not None 243 and self.dataset is not None 244 ): 245 table = self.client.get_table( 246 f"{self.project}.{self.dataset}.{self.table}" 247 ) 248 table_metadata = { 249 "tableType": table.table_type, 250 "friendlyName": table.friendly_name, 251 "schema": { 252 "fields": [field.to_api_repr() for field in table.schema] 253 }, 254 } 255 256 return { 257 "valid": True, 258 "referencedTables": referenced_tables, 259 "schema": query_schema, 260 "tableMetadata": table_metadata, 261 } 262 except Exception as e: 263 print(f"ERROR {e}") 264 return None
Return the dry run result.
266 def get_schema(self): 267 """Return the query schema by dry running the SQL file.""" 268 self.validate() 269 270 if ( 271 self.dry_run_result 272 and self.dry_run_result["valid"] 273 and "schema" in self.dry_run_result 274 ): 275 return self.dry_run_result["schema"]["fields"] 276 277 return []
Return the query schema by dry running the SQL file.
279 def get_table_schema(self): 280 """Return the schema of the provided table.""" 281 self.validate() 282 283 if ( 284 self.dry_run_result 285 and self.dry_run_result["valid"] 286 and "tableMetadata" in self.dry_run_result 287 ): 288 return self.dry_run_result["tableMetadata"]["schema"]["fields"] 289 290 return []
Return the schema of the provided table.
292 def get_table_metadata(self): 293 """Return table metadata.""" 294 self.validate() 295 296 if ( 297 self.dry_run_result 298 and self.dry_run_result["valid"] 299 and "tableMetadata" in self.dry_run_result 300 ): 301 return self.dry_run_result["tableMetadata"] 302 303 return {}
Return table metadata.
305 def validate(self): 306 """Dry run the provided SQL file and check if valid.""" 307 dry_run_error = DryRunError( 308 "Error when dry running SQL", 309 self.get_error(), 310 self.use_cloud_function, 311 self.table, 312 ) 313 314 if self.dry_run_result is None: 315 raise dry_run_error 316 317 if self.dry_run_result["valid"]: 318 return True 319 elif self.get_error() == Errors.READ_ONLY: 320 # We want the dryrun service to only have read permissions, so 321 # we expect CREATE VIEW and CREATE TABLE to throw specific 322 # exceptions. 323 return True 324 elif self.get_error() == Errors.DATE_FILTER_NEEDED: 325 # With strip_dml flag, some queries require a partition filter 326 # (submission_date, submission_timestamp, etc.) to run 327 return True 328 else: 329 print("ERROR\n", self.dry_run_result["errors"]) 330 raise dry_run_error
Dry run the provided SQL file and check if valid.
332 def errors(self): 333 """Dry run the provided SQL file and return errors.""" 334 if self.dry_run_result is None: 335 return [] 336 return self.dry_run_result.get("errors", [])
Dry run the provided SQL file and return errors.
338 def get_error(self) -> Optional[Errors]: 339 """Get specific errors for edge case handling.""" 340 errors = self.errors() 341 if len(errors) != 1: 342 return None 343 344 error = errors[0] 345 if error and error.get("code") in [400, 403]: 346 error_message = error.get("message", "") 347 if ( 348 "does not have bigquery.tables.create permission for dataset" 349 in error_message 350 or "Permission bigquery.tables.create denied" in error_message 351 or "Permission bigquery.datasets.update denied" in error_message 352 ): 353 return Errors.READ_ONLY 354 if "without a filter over column(s)" in error_message: 355 return Errors.DATE_FILTER_NEEDED 356 if ( 357 "Syntax error: Expected end of input but got keyword WHERE" 358 in error_message 359 ): 360 return Errors.DATE_FILTER_NEEDED_AND_SYNTAX 361 if ( 362 "Permission bigquery.tables.get denied on table" in error_message 363 or "User does not have permission to query table" in error_message 364 ): 365 return Errors.PERMISSION_DENIED 366 return None
Get specific errors for edge case handling.