generator.dryrun
Dry Run method to get BigQuery metadata.
1"""Dry Run method to get BigQuery metadata.""" 2 3import json 4from enum import Enum 5from functools import cached_property 6from typing import Optional 7from urllib.request import Request, urlopen 8 9import google.auth 10from google.auth.transport.requests import Request as GoogleAuthRequest 11from google.cloud import bigquery 12from google.oauth2.id_token import fetch_id_token 13 14DRY_RUN_URL = ( 15 "https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun" 16) 17 18 19def credentials(auth_req: Optional[GoogleAuthRequest] = None): 20 """Get GCP credentials.""" 21 auth_req = auth_req or GoogleAuthRequest() 22 creds, _ = google.auth.default( 23 scopes=["https://www.googleapis.com/auth/cloud-platform"] 24 ) 25 creds.refresh(auth_req) 26 return creds 27 28 29def id_token(): 30 """Get token to authenticate against Cloud Function.""" 31 auth_req = GoogleAuthRequest() 32 creds = credentials(auth_req) 33 34 if hasattr(creds, "id_token"): 35 # Get token from default credentials for the current environment created via Cloud SDK run 36 id_token = creds.id_token 37 else: 38 # If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file, 39 # then ID token is acquired using this service account credentials. 40 id_token = fetch_id_token(auth_req, DRY_RUN_URL) 41 return id_token 42 43 44class DryRunError(Exception): 45 """Exception raised on dry run errors.""" 46 47 def __init__(self, message, error, use_cloud_function, table_id): 48 """Initialize DryRunError.""" 49 super().__init__(message) 50 self.error = error 51 self.use_cloud_function = use_cloud_function 52 self.table_id = table_id 53 54 def __reduce__(self): 55 """ 56 Override to ensure that all parameters are being passed when pickling. 57 58 Pickling happens when passing exception between processes (e.g. via multiprocessing) 59 """ 60 return ( 61 self.__class__, 62 self.args + (self.error, self.use_cloud_function, self.table_id), 63 ) 64 65 66class Errors(Enum): 67 """DryRun errors that require special handling.""" 68 69 READ_ONLY = 1 70 DATE_FILTER_NEEDED = 2 71 DATE_FILTER_NEEDED_AND_SYNTAX = 3 72 PERMISSION_DENIED = 4 73 74 75class DryRunContext: 76 """DryRun builder class.""" 77 78 def __init__( 79 self, 80 use_cloud_function=False, 81 id_token=None, 82 credentials=None, 83 dry_run_url=DRY_RUN_URL, 84 ): 85 """Initialize dry run instance.""" 86 self.use_cloud_function = use_cloud_function 87 self.dry_run_url = dry_run_url 88 self.id_token = id_token 89 self.credentials = credentials 90 91 def create( 92 self, 93 sql=None, 94 project="moz-fx-data-shared-prod", 95 dataset=None, 96 table=None, 97 ): 98 """Initialize a DryRun instance.""" 99 return DryRun( 100 use_cloud_function=self.use_cloud_function, 101 id_token=self.id_token, 102 credentials=self.credentials, 103 sql=sql, 104 project=project, 105 dataset=dataset, 106 table=table, 107 dry_run_url=self.dry_run_url, 108 ) 109 110 111class DryRun: 112 """Dry run SQL.""" 113 114 def __init__( 115 self, 116 use_cloud_function=False, 117 id_token=None, 118 credentials=None, 119 sql=None, 120 project="moz-fx-data-shared-prod", 121 dataset=None, 122 table=None, 123 dry_run_url=DRY_RUN_URL, 124 ): 125 """Initialize dry run instance.""" 126 self.sql = sql 127 self.use_cloud_function = use_cloud_function 128 self.project = project 129 self.dataset = dataset 130 self.table = table 131 self.dry_run_url = dry_run_url 132 self.id_token = id_token 133 self.credentials = credentials 134 135 @cached_property 136 def client(self): 137 """Get BigQuery client instance.""" 138 return bigquery.Client(credentials=self.credentials) 139 140 @cached_property 141 def dry_run_result(self): 142 """Return the dry run result.""" 143 try: 144 if self.use_cloud_function: 145 json_data = { 146 "query": self.sql or "SELECT 1", 147 "project": self.project, 148 "dataset": self.dataset or "telemetry", 149 } 150 151 if self.table: 152 json_data["table"] = self.table 153 154 r = urlopen( 155 Request( 156 self.dry_run_url, 157 headers={ 158 "Content-Type": "application/json", 159 "Authorization": f"Bearer {self.id_token}", 160 }, 161 data=json.dumps(json_data).encode("utf8"), 162 method="POST", 163 ) 164 ) 165 return json.load(r) 166 else: 167 query_schema = None 168 referenced_tables = [] 169 table_metadata = None 170 171 if self.sql: 172 job_config = bigquery.QueryJobConfig( 173 dry_run=True, 174 use_query_cache=False, 175 query_parameters=[ 176 bigquery.ScalarQueryParameter( 177 "submission_date", "DATE", "2019-01-01" 178 ) 179 ], 180 ) 181 182 if self.project: 183 job_config.connection_properties = [ 184 bigquery.ConnectionProperty( 185 "dataset_project_id", self.project 186 ) 187 ] 188 189 job = self.client.query(self.sql, job_config=job_config) 190 query_schema = ( 191 job._properties.get("statistics", {}) 192 .get("query", {}) 193 .get("schema", {}) 194 ) 195 referenced_tables = [ 196 ref.to_api_repr() for ref in job.referenced_tables 197 ] 198 199 if ( 200 self.project is not None 201 and self.table is not None 202 and self.dataset is not None 203 ): 204 table = self.client.get_table( 205 f"{self.project}.{self.dataset}.{self.table}" 206 ) 207 table_metadata = { 208 "tableType": table.table_type, 209 "friendlyName": table.friendly_name, 210 "schema": { 211 "fields": [field.to_api_repr() for field in table.schema] 212 }, 213 } 214 215 return { 216 "valid": True, 217 "referencedTables": referenced_tables, 218 "schema": query_schema, 219 "tableMetadata": table_metadata, 220 } 221 except Exception as e: 222 print(f"ERROR {e}") 223 return None 224 225 def get_schema(self): 226 """Return the query schema by dry running the SQL file.""" 227 self.validate() 228 229 if ( 230 self.dry_run_result 231 and self.dry_run_result["valid"] 232 and "schema" in self.dry_run_result 233 ): 234 return self.dry_run_result["schema"]["fields"] 235 236 return [] 237 238 def get_table_schema(self): 239 """Return the schema of the provided table.""" 240 self.validate() 241 242 if ( 243 self.dry_run_result 244 and self.dry_run_result["valid"] 245 and "tableMetadata" in self.dry_run_result 246 ): 247 return self.dry_run_result["tableMetadata"]["schema"]["fields"] 248 249 return [] 250 251 def get_table_metadata(self): 252 """Return table metadata.""" 253 self.validate() 254 255 if ( 256 self.dry_run_result 257 and self.dry_run_result["valid"] 258 and "tableMetadata" in self.dry_run_result 259 ): 260 return self.dry_run_result["tableMetadata"] 261 262 return {} 263 264 def validate(self): 265 """Dry run the provided SQL file and check if valid.""" 266 dry_run_error = DryRunError( 267 "Error when dry running SQL", 268 self.get_error(), 269 self.use_cloud_function, 270 self.table, 271 ) 272 273 if self.dry_run_result is None: 274 raise dry_run_error 275 276 if self.dry_run_result["valid"]: 277 return True 278 elif self.get_error() == Errors.READ_ONLY: 279 # We want the dryrun service to only have read permissions, so 280 # we expect CREATE VIEW and CREATE TABLE to throw specific 281 # exceptions. 282 return True 283 elif self.get_error() == Errors.DATE_FILTER_NEEDED: 284 # With strip_dml flag, some queries require a partition filter 285 # (submission_date, submission_timestamp, etc.) to run 286 return True 287 else: 288 print("ERROR\n", self.dry_run_result["errors"]) 289 raise dry_run_error 290 291 def errors(self): 292 """Dry run the provided SQL file and return errors.""" 293 if self.dry_run_result is None: 294 return [] 295 return self.dry_run_result.get("errors", []) 296 297 def get_error(self) -> Optional[Errors]: 298 """Get specific errors for edge case handling.""" 299 errors = self.errors() 300 if len(errors) != 1: 301 return None 302 303 error = errors[0] 304 if error and error.get("code") in [400, 403]: 305 error_message = error.get("message", "") 306 if ( 307 "does not have bigquery.tables.create permission for dataset" 308 in error_message 309 or "Permission bigquery.tables.create denied" in error_message 310 or "Permission bigquery.datasets.update denied" in error_message 311 ): 312 return Errors.READ_ONLY 313 if "without a filter over column(s)" in error_message: 314 return Errors.DATE_FILTER_NEEDED 315 if ( 316 "Syntax error: Expected end of input but got keyword WHERE" 317 in error_message 318 ): 319 return Errors.DATE_FILTER_NEEDED_AND_SYNTAX 320 if ( 321 "Permission bigquery.tables.get denied on table" in error_message 322 or "User does not have permission to query table" in error_message 323 ): 324 return Errors.PERMISSION_DENIED 325 return None
DRY_RUN_URL =
'https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun'
def
credentials(auth_req: Optional[google.auth.transport.requests.Request] = None):
20def credentials(auth_req: Optional[GoogleAuthRequest] = None): 21 """Get GCP credentials.""" 22 auth_req = auth_req or GoogleAuthRequest() 23 creds, _ = google.auth.default( 24 scopes=["https://www.googleapis.com/auth/cloud-platform"] 25 ) 26 creds.refresh(auth_req) 27 return creds
Get GCP credentials.
def
id_token():
30def id_token(): 31 """Get token to authenticate against Cloud Function.""" 32 auth_req = GoogleAuthRequest() 33 creds = credentials(auth_req) 34 35 if hasattr(creds, "id_token"): 36 # Get token from default credentials for the current environment created via Cloud SDK run 37 id_token = creds.id_token 38 else: 39 # If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file, 40 # then ID token is acquired using this service account credentials. 41 id_token = fetch_id_token(auth_req, DRY_RUN_URL) 42 return id_token
Get token to authenticate against Cloud Function.
class
DryRunError(builtins.Exception):
45class DryRunError(Exception): 46 """Exception raised on dry run errors.""" 47 48 def __init__(self, message, error, use_cloud_function, table_id): 49 """Initialize DryRunError.""" 50 super().__init__(message) 51 self.error = error 52 self.use_cloud_function = use_cloud_function 53 self.table_id = table_id 54 55 def __reduce__(self): 56 """ 57 Override to ensure that all parameters are being passed when pickling. 58 59 Pickling happens when passing exception between processes (e.g. via multiprocessing) 60 """ 61 return ( 62 self.__class__, 63 self.args + (self.error, self.use_cloud_function, self.table_id), 64 )
Exception raised on dry run errors.
DryRunError(message, error, use_cloud_function, table_id)
48 def __init__(self, message, error, use_cloud_function, table_id): 49 """Initialize DryRunError.""" 50 super().__init__(message) 51 self.error = error 52 self.use_cloud_function = use_cloud_function 53 self.table_id = table_id
Initialize DryRunError.
class
Errors(enum.Enum):
67class Errors(Enum): 68 """DryRun errors that require special handling.""" 69 70 READ_ONLY = 1 71 DATE_FILTER_NEEDED = 2 72 DATE_FILTER_NEEDED_AND_SYNTAX = 3 73 PERMISSION_DENIED = 4
DryRun errors that require special handling.
READ_ONLY =
<Errors.READ_ONLY: 1>
DATE_FILTER_NEEDED =
<Errors.DATE_FILTER_NEEDED: 2>
DATE_FILTER_NEEDED_AND_SYNTAX =
<Errors.DATE_FILTER_NEEDED_AND_SYNTAX: 3>
PERMISSION_DENIED =
<Errors.PERMISSION_DENIED: 4>
class
DryRunContext:
76class DryRunContext: 77 """DryRun builder class.""" 78 79 def __init__( 80 self, 81 use_cloud_function=False, 82 id_token=None, 83 credentials=None, 84 dry_run_url=DRY_RUN_URL, 85 ): 86 """Initialize dry run instance.""" 87 self.use_cloud_function = use_cloud_function 88 self.dry_run_url = dry_run_url 89 self.id_token = id_token 90 self.credentials = credentials 91 92 def create( 93 self, 94 sql=None, 95 project="moz-fx-data-shared-prod", 96 dataset=None, 97 table=None, 98 ): 99 """Initialize a DryRun instance.""" 100 return DryRun( 101 use_cloud_function=self.use_cloud_function, 102 id_token=self.id_token, 103 credentials=self.credentials, 104 sql=sql, 105 project=project, 106 dataset=dataset, 107 table=table, 108 dry_run_url=self.dry_run_url, 109 )
DryRun builder class.
DryRunContext( use_cloud_function=False, id_token=None, credentials=None, dry_run_url='https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun')
79 def __init__( 80 self, 81 use_cloud_function=False, 82 id_token=None, 83 credentials=None, 84 dry_run_url=DRY_RUN_URL, 85 ): 86 """Initialize dry run instance.""" 87 self.use_cloud_function = use_cloud_function 88 self.dry_run_url = dry_run_url 89 self.id_token = id_token 90 self.credentials = credentials
Initialize dry run instance.
def
create( self, sql=None, project='moz-fx-data-shared-prod', dataset=None, table=None):
92 def create( 93 self, 94 sql=None, 95 project="moz-fx-data-shared-prod", 96 dataset=None, 97 table=None, 98 ): 99 """Initialize a DryRun instance.""" 100 return DryRun( 101 use_cloud_function=self.use_cloud_function, 102 id_token=self.id_token, 103 credentials=self.credentials, 104 sql=sql, 105 project=project, 106 dataset=dataset, 107 table=table, 108 dry_run_url=self.dry_run_url, 109 )
Initialize a DryRun instance.
class
DryRun:
112class DryRun: 113 """Dry run SQL.""" 114 115 def __init__( 116 self, 117 use_cloud_function=False, 118 id_token=None, 119 credentials=None, 120 sql=None, 121 project="moz-fx-data-shared-prod", 122 dataset=None, 123 table=None, 124 dry_run_url=DRY_RUN_URL, 125 ): 126 """Initialize dry run instance.""" 127 self.sql = sql 128 self.use_cloud_function = use_cloud_function 129 self.project = project 130 self.dataset = dataset 131 self.table = table 132 self.dry_run_url = dry_run_url 133 self.id_token = id_token 134 self.credentials = credentials 135 136 @cached_property 137 def client(self): 138 """Get BigQuery client instance.""" 139 return bigquery.Client(credentials=self.credentials) 140 141 @cached_property 142 def dry_run_result(self): 143 """Return the dry run result.""" 144 try: 145 if self.use_cloud_function: 146 json_data = { 147 "query": self.sql or "SELECT 1", 148 "project": self.project, 149 "dataset": self.dataset or "telemetry", 150 } 151 152 if self.table: 153 json_data["table"] = self.table 154 155 r = urlopen( 156 Request( 157 self.dry_run_url, 158 headers={ 159 "Content-Type": "application/json", 160 "Authorization": f"Bearer {self.id_token}", 161 }, 162 data=json.dumps(json_data).encode("utf8"), 163 method="POST", 164 ) 165 ) 166 return json.load(r) 167 else: 168 query_schema = None 169 referenced_tables = [] 170 table_metadata = None 171 172 if self.sql: 173 job_config = bigquery.QueryJobConfig( 174 dry_run=True, 175 use_query_cache=False, 176 query_parameters=[ 177 bigquery.ScalarQueryParameter( 178 "submission_date", "DATE", "2019-01-01" 179 ) 180 ], 181 ) 182 183 if self.project: 184 job_config.connection_properties = [ 185 bigquery.ConnectionProperty( 186 "dataset_project_id", self.project 187 ) 188 ] 189 190 job = self.client.query(self.sql, job_config=job_config) 191 query_schema = ( 192 job._properties.get("statistics", {}) 193 .get("query", {}) 194 .get("schema", {}) 195 ) 196 referenced_tables = [ 197 ref.to_api_repr() for ref in job.referenced_tables 198 ] 199 200 if ( 201 self.project is not None 202 and self.table is not None 203 and self.dataset is not None 204 ): 205 table = self.client.get_table( 206 f"{self.project}.{self.dataset}.{self.table}" 207 ) 208 table_metadata = { 209 "tableType": table.table_type, 210 "friendlyName": table.friendly_name, 211 "schema": { 212 "fields": [field.to_api_repr() for field in table.schema] 213 }, 214 } 215 216 return { 217 "valid": True, 218 "referencedTables": referenced_tables, 219 "schema": query_schema, 220 "tableMetadata": table_metadata, 221 } 222 except Exception as e: 223 print(f"ERROR {e}") 224 return None 225 226 def get_schema(self): 227 """Return the query schema by dry running the SQL file.""" 228 self.validate() 229 230 if ( 231 self.dry_run_result 232 and self.dry_run_result["valid"] 233 and "schema" in self.dry_run_result 234 ): 235 return self.dry_run_result["schema"]["fields"] 236 237 return [] 238 239 def get_table_schema(self): 240 """Return the schema of the provided table.""" 241 self.validate() 242 243 if ( 244 self.dry_run_result 245 and self.dry_run_result["valid"] 246 and "tableMetadata" in self.dry_run_result 247 ): 248 return self.dry_run_result["tableMetadata"]["schema"]["fields"] 249 250 return [] 251 252 def get_table_metadata(self): 253 """Return table metadata.""" 254 self.validate() 255 256 if ( 257 self.dry_run_result 258 and self.dry_run_result["valid"] 259 and "tableMetadata" in self.dry_run_result 260 ): 261 return self.dry_run_result["tableMetadata"] 262 263 return {} 264 265 def validate(self): 266 """Dry run the provided SQL file and check if valid.""" 267 dry_run_error = DryRunError( 268 "Error when dry running SQL", 269 self.get_error(), 270 self.use_cloud_function, 271 self.table, 272 ) 273 274 if self.dry_run_result is None: 275 raise dry_run_error 276 277 if self.dry_run_result["valid"]: 278 return True 279 elif self.get_error() == Errors.READ_ONLY: 280 # We want the dryrun service to only have read permissions, so 281 # we expect CREATE VIEW and CREATE TABLE to throw specific 282 # exceptions. 283 return True 284 elif self.get_error() == Errors.DATE_FILTER_NEEDED: 285 # With strip_dml flag, some queries require a partition filter 286 # (submission_date, submission_timestamp, etc.) to run 287 return True 288 else: 289 print("ERROR\n", self.dry_run_result["errors"]) 290 raise dry_run_error 291 292 def errors(self): 293 """Dry run the provided SQL file and return errors.""" 294 if self.dry_run_result is None: 295 return [] 296 return self.dry_run_result.get("errors", []) 297 298 def get_error(self) -> Optional[Errors]: 299 """Get specific errors for edge case handling.""" 300 errors = self.errors() 301 if len(errors) != 1: 302 return None 303 304 error = errors[0] 305 if error and error.get("code") in [400, 403]: 306 error_message = error.get("message", "") 307 if ( 308 "does not have bigquery.tables.create permission for dataset" 309 in error_message 310 or "Permission bigquery.tables.create denied" in error_message 311 or "Permission bigquery.datasets.update denied" in error_message 312 ): 313 return Errors.READ_ONLY 314 if "without a filter over column(s)" in error_message: 315 return Errors.DATE_FILTER_NEEDED 316 if ( 317 "Syntax error: Expected end of input but got keyword WHERE" 318 in error_message 319 ): 320 return Errors.DATE_FILTER_NEEDED_AND_SYNTAX 321 if ( 322 "Permission bigquery.tables.get denied on table" in error_message 323 or "User does not have permission to query table" in error_message 324 ): 325 return Errors.PERMISSION_DENIED 326 return None
Dry run SQL.
DryRun( use_cloud_function=False, id_token=None, credentials=None, sql=None, project='moz-fx-data-shared-prod', dataset=None, table=None, dry_run_url='https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun')
115 def __init__( 116 self, 117 use_cloud_function=False, 118 id_token=None, 119 credentials=None, 120 sql=None, 121 project="moz-fx-data-shared-prod", 122 dataset=None, 123 table=None, 124 dry_run_url=DRY_RUN_URL, 125 ): 126 """Initialize dry run instance.""" 127 self.sql = sql 128 self.use_cloud_function = use_cloud_function 129 self.project = project 130 self.dataset = dataset 131 self.table = table 132 self.dry_run_url = dry_run_url 133 self.id_token = id_token 134 self.credentials = credentials
Initialize dry run instance.
client
136 @cached_property 137 def client(self): 138 """Get BigQuery client instance.""" 139 return bigquery.Client(credentials=self.credentials)
Get BigQuery client instance.
dry_run_result
141 @cached_property 142 def dry_run_result(self): 143 """Return the dry run result.""" 144 try: 145 if self.use_cloud_function: 146 json_data = { 147 "query": self.sql or "SELECT 1", 148 "project": self.project, 149 "dataset": self.dataset or "telemetry", 150 } 151 152 if self.table: 153 json_data["table"] = self.table 154 155 r = urlopen( 156 Request( 157 self.dry_run_url, 158 headers={ 159 "Content-Type": "application/json", 160 "Authorization": f"Bearer {self.id_token}", 161 }, 162 data=json.dumps(json_data).encode("utf8"), 163 method="POST", 164 ) 165 ) 166 return json.load(r) 167 else: 168 query_schema = None 169 referenced_tables = [] 170 table_metadata = None 171 172 if self.sql: 173 job_config = bigquery.QueryJobConfig( 174 dry_run=True, 175 use_query_cache=False, 176 query_parameters=[ 177 bigquery.ScalarQueryParameter( 178 "submission_date", "DATE", "2019-01-01" 179 ) 180 ], 181 ) 182 183 if self.project: 184 job_config.connection_properties = [ 185 bigquery.ConnectionProperty( 186 "dataset_project_id", self.project 187 ) 188 ] 189 190 job = self.client.query(self.sql, job_config=job_config) 191 query_schema = ( 192 job._properties.get("statistics", {}) 193 .get("query", {}) 194 .get("schema", {}) 195 ) 196 referenced_tables = [ 197 ref.to_api_repr() for ref in job.referenced_tables 198 ] 199 200 if ( 201 self.project is not None 202 and self.table is not None 203 and self.dataset is not None 204 ): 205 table = self.client.get_table( 206 f"{self.project}.{self.dataset}.{self.table}" 207 ) 208 table_metadata = { 209 "tableType": table.table_type, 210 "friendlyName": table.friendly_name, 211 "schema": { 212 "fields": [field.to_api_repr() for field in table.schema] 213 }, 214 } 215 216 return { 217 "valid": True, 218 "referencedTables": referenced_tables, 219 "schema": query_schema, 220 "tableMetadata": table_metadata, 221 } 222 except Exception as e: 223 print(f"ERROR {e}") 224 return None
Return the dry run result.
def
get_schema(self):
226 def get_schema(self): 227 """Return the query schema by dry running the SQL file.""" 228 self.validate() 229 230 if ( 231 self.dry_run_result 232 and self.dry_run_result["valid"] 233 and "schema" in self.dry_run_result 234 ): 235 return self.dry_run_result["schema"]["fields"] 236 237 return []
Return the query schema by dry running the SQL file.
def
get_table_schema(self):
239 def get_table_schema(self): 240 """Return the schema of the provided table.""" 241 self.validate() 242 243 if ( 244 self.dry_run_result 245 and self.dry_run_result["valid"] 246 and "tableMetadata" in self.dry_run_result 247 ): 248 return self.dry_run_result["tableMetadata"]["schema"]["fields"] 249 250 return []
Return the schema of the provided table.
def
get_table_metadata(self):
252 def get_table_metadata(self): 253 """Return table metadata.""" 254 self.validate() 255 256 if ( 257 self.dry_run_result 258 and self.dry_run_result["valid"] 259 and "tableMetadata" in self.dry_run_result 260 ): 261 return self.dry_run_result["tableMetadata"] 262 263 return {}
Return table metadata.
def
validate(self):
265 def validate(self): 266 """Dry run the provided SQL file and check if valid.""" 267 dry_run_error = DryRunError( 268 "Error when dry running SQL", 269 self.get_error(), 270 self.use_cloud_function, 271 self.table, 272 ) 273 274 if self.dry_run_result is None: 275 raise dry_run_error 276 277 if self.dry_run_result["valid"]: 278 return True 279 elif self.get_error() == Errors.READ_ONLY: 280 # We want the dryrun service to only have read permissions, so 281 # we expect CREATE VIEW and CREATE TABLE to throw specific 282 # exceptions. 283 return True 284 elif self.get_error() == Errors.DATE_FILTER_NEEDED: 285 # With strip_dml flag, some queries require a partition filter 286 # (submission_date, submission_timestamp, etc.) to run 287 return True 288 else: 289 print("ERROR\n", self.dry_run_result["errors"]) 290 raise dry_run_error
Dry run the provided SQL file and check if valid.
def
errors(self):
292 def errors(self): 293 """Dry run the provided SQL file and return errors.""" 294 if self.dry_run_result is None: 295 return [] 296 return self.dry_run_result.get("errors", [])
Dry run the provided SQL file and return errors.
298 def get_error(self) -> Optional[Errors]: 299 """Get specific errors for edge case handling.""" 300 errors = self.errors() 301 if len(errors) != 1: 302 return None 303 304 error = errors[0] 305 if error and error.get("code") in [400, 403]: 306 error_message = error.get("message", "") 307 if ( 308 "does not have bigquery.tables.create permission for dataset" 309 in error_message 310 or "Permission bigquery.tables.create denied" in error_message 311 or "Permission bigquery.datasets.update denied" in error_message 312 ): 313 return Errors.READ_ONLY 314 if "without a filter over column(s)" in error_message: 315 return Errors.DATE_FILTER_NEEDED 316 if ( 317 "Syntax error: Expected end of input but got keyword WHERE" 318 in error_message 319 ): 320 return Errors.DATE_FILTER_NEEDED_AND_SYNTAX 321 if ( 322 "Permission bigquery.tables.get denied on table" in error_message 323 or "User does not have permission to query table" in error_message 324 ): 325 return Errors.PERMISSION_DENIED 326 return None
Get specific errors for edge case handling.