generator.dryrun

Dry Run method to get BigQuery metadata.

  1"""Dry Run method to get BigQuery metadata."""
  2
  3import json
  4import os
  5import time
  6from enum import Enum
  7from functools import cached_property
  8from typing import Optional
  9from urllib.error import URLError
 10from urllib.request import Request, urlopen
 11
 12import google.auth
 13from google.auth.transport.requests import Request as GoogleAuthRequest
 14from google.cloud import bigquery
 15from google.oauth2.id_token import fetch_id_token
 16
 17DRY_RUN_URL = "https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/dryrun"
 18
 19
 20def urlopen_with_retry(request, max_retries=5, initial_delay=2.0, timeout=30):
 21    """
 22    Perform urlopen with exponential backoff retry logic.
 23
 24    Args:
 25        request: The urllib Request object
 26        max_retries: Maximum number of retry attempts (default: 5)
 27        initial_delay: Initial delay in seconds before first retry (default: 2.0)
 28        timeout: Timeout in seconds for each request (default: 30)
 29
 30    Returns:
 31        The response from urlopen
 32
 33    Raises:
 34        URLError: If all retry attempts fail
 35    """
 36    last_error = None
 37    delay = initial_delay
 38
 39    for attempt in range(max_retries + 1):
 40        try:
 41            return urlopen(request, timeout=timeout)
 42        except URLError as e:
 43            last_error = e
 44            if attempt < max_retries:
 45                print(
 46                    f"Network request failed (attempt {attempt + 1}/{max_retries + 1}): {e}"
 47                )
 48                print(f"Retrying in {delay} seconds...")
 49                time.sleep(delay)
 50                delay *= 2  # Exponential backoff
 51            else:
 52                print(f"All {max_retries + 1} attempts failed")
 53
 54    raise last_error
 55
 56
 57def credentials(auth_req: Optional[GoogleAuthRequest] = None):
 58    """Get GCP credentials."""
 59    auth_req = auth_req or GoogleAuthRequest()
 60    creds, _ = google.auth.default(
 61        scopes=["https://www.googleapis.com/auth/cloud-platform"]
 62    )
 63    creds.refresh(auth_req)
 64    return creds
 65
 66
 67def id_token():
 68    """Get token to authenticate against Cloud Function."""
 69    # look for token created by the GitHub Actions workflow
 70    id_token = os.environ.get("GOOGLE_GHA_ID_TOKEN")
 71
 72    if not id_token:
 73        auth_req = GoogleAuthRequest()
 74        creds = credentials(auth_req)
 75        if hasattr(creds, "id_token"):
 76            # Get token from default credentials for the current environment created via Cloud SDK run
 77            id_token = creds.id_token
 78        else:
 79            # If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file,
 80            # then ID token is acquired using this service account credentials.
 81            id_token = fetch_id_token(auth_req, DRY_RUN_URL)
 82    return id_token
 83
 84
 85class DryRunError(Exception):
 86    """Exception raised on dry run errors."""
 87
 88    def __init__(self, message, error, use_cloud_function, table_id):
 89        """Initialize DryRunError."""
 90        super().__init__(message)
 91        self.error = error
 92        self.use_cloud_function = use_cloud_function
 93        self.table_id = table_id
 94
 95    def __reduce__(self):
 96        """
 97        Override to ensure that all parameters are being passed when pickling.
 98
 99        Pickling happens when passing exception between processes (e.g. via multiprocessing)
100        """
101        return (
102            self.__class__,
103            self.args + (self.error, self.use_cloud_function, self.table_id),
104        )
105
106
107class Errors(Enum):
108    """DryRun errors that require special handling."""
109
110    READ_ONLY = 1
111    DATE_FILTER_NEEDED = 2
112    DATE_FILTER_NEEDED_AND_SYNTAX = 3
113    PERMISSION_DENIED = 4
114
115
116class DryRunContext:
117    """DryRun builder class."""
118
119    def __init__(
120        self,
121        use_cloud_function=False,
122        id_token=None,
123        credentials=None,
124        dry_run_url=DRY_RUN_URL,
125    ):
126        """Initialize dry run instance."""
127        self.use_cloud_function = use_cloud_function
128        self.dry_run_url = dry_run_url
129        self.id_token = id_token
130        self.credentials = credentials
131
132    def create(
133        self,
134        sql=None,
135        project="moz-fx-data-shared-prod",
136        dataset=None,
137        table=None,
138    ):
139        """Initialize a DryRun instance."""
140        return DryRun(
141            use_cloud_function=self.use_cloud_function,
142            id_token=self.id_token,
143            credentials=self.credentials,
144            sql=sql,
145            project=project,
146            dataset=dataset,
147            table=table,
148            dry_run_url=self.dry_run_url,
149        )
150
151
152class DryRun:
153    """Dry run SQL."""
154
155    def __init__(
156        self,
157        use_cloud_function=False,
158        id_token=None,
159        credentials=None,
160        sql=None,
161        project="moz-fx-data-shared-prod",
162        dataset=None,
163        table=None,
164        dry_run_url=DRY_RUN_URL,
165    ):
166        """Initialize dry run instance."""
167        self.sql = sql
168        self.use_cloud_function = use_cloud_function
169        self.project = project
170        self.dataset = dataset
171        self.table = table
172        self.dry_run_url = dry_run_url
173        self.id_token = id_token
174        self.credentials = credentials
175
176    @cached_property
177    def client(self):
178        """Get BigQuery client instance."""
179        return bigquery.Client(credentials=self.credentials)
180
181    @cached_property
182    def dry_run_result(self):
183        """Return the dry run result."""
184        try:
185            if self.use_cloud_function:
186                json_data = {
187                    "query": self.sql or "SELECT 1",
188                    "project": self.project,
189                    "dataset": self.dataset or "telemetry",
190                }
191
192                if self.table:
193                    json_data["table"] = self.table
194
195                request = Request(
196                    self.dry_run_url,
197                    headers={
198                        "Content-Type": "application/json",
199                        "Authorization": f"Bearer {self.id_token}",
200                    },
201                    data=json.dumps(json_data).encode("utf8"),
202                    method="POST",
203                )
204                r = urlopen_with_retry(request)
205                return json.load(r)
206            else:
207                query_schema = None
208                referenced_tables = []
209                table_metadata = None
210
211                if self.sql:
212                    job_config = bigquery.QueryJobConfig(
213                        dry_run=True,
214                        use_query_cache=False,
215                        query_parameters=[
216                            bigquery.ScalarQueryParameter(
217                                "submission_date", "DATE", "2019-01-01"
218                            )
219                        ],
220                    )
221
222                    if self.project:
223                        job_config.connection_properties = [
224                            bigquery.ConnectionProperty(
225                                "dataset_project_id", self.project
226                            )
227                        ]
228
229                    job = self.client.query(self.sql, job_config=job_config)
230                    query_schema = (
231                        job._properties.get("statistics", {})
232                        .get("query", {})
233                        .get("schema", {})
234                    )
235                    referenced_tables = [
236                        ref.to_api_repr() for ref in job.referenced_tables
237                    ]
238
239                if (
240                    self.project is not None
241                    and self.table is not None
242                    and self.dataset is not None
243                ):
244                    table = self.client.get_table(
245                        f"{self.project}.{self.dataset}.{self.table}"
246                    )
247                    table_metadata = {
248                        "tableType": table.table_type,
249                        "friendlyName": table.friendly_name,
250                        "schema": {
251                            "fields": [field.to_api_repr() for field in table.schema]
252                        },
253                    }
254
255                return {
256                    "valid": True,
257                    "referencedTables": referenced_tables,
258                    "schema": query_schema,
259                    "tableMetadata": table_metadata,
260                }
261        except Exception as e:
262            print(f"ERROR {e}")
263            return None
264
265    def get_schema(self):
266        """Return the query schema by dry running the SQL file."""
267        self.validate()
268
269        if (
270            self.dry_run_result
271            and self.dry_run_result["valid"]
272            and "schema" in self.dry_run_result
273        ):
274            return self.dry_run_result["schema"]["fields"]
275
276        return []
277
278    def get_table_schema(self):
279        """Return the schema of the provided table."""
280        self.validate()
281
282        if (
283            self.dry_run_result
284            and self.dry_run_result["valid"]
285            and "tableMetadata" in self.dry_run_result
286        ):
287            return self.dry_run_result["tableMetadata"]["schema"]["fields"]
288
289        return []
290
291    def get_table_metadata(self):
292        """Return table metadata."""
293        self.validate()
294
295        if (
296            self.dry_run_result
297            and self.dry_run_result["valid"]
298            and "tableMetadata" in self.dry_run_result
299        ):
300            return self.dry_run_result["tableMetadata"]
301
302        return {}
303
304    def validate(self):
305        """Dry run the provided SQL file and check if valid."""
306        dry_run_error = DryRunError(
307            "Error when dry running SQL",
308            self.get_error(),
309            self.use_cloud_function,
310            self.table,
311        )
312
313        if self.dry_run_result is None:
314            raise dry_run_error
315
316        if self.dry_run_result["valid"]:
317            return True
318        elif self.get_error() == Errors.READ_ONLY:
319            # We want the dryrun service to only have read permissions, so
320            # we expect CREATE VIEW and CREATE TABLE to throw specific
321            # exceptions.
322            return True
323        elif self.get_error() == Errors.DATE_FILTER_NEEDED:
324            # With strip_dml flag, some queries require a partition filter
325            # (submission_date, submission_timestamp, etc.) to run
326            return True
327        else:
328            print("ERROR\n", self.dry_run_result["errors"])
329            raise dry_run_error
330
331    def errors(self):
332        """Dry run the provided SQL file and return errors."""
333        if self.dry_run_result is None:
334            return []
335        return self.dry_run_result.get("errors", [])
336
337    def get_error(self) -> Optional[Errors]:
338        """Get specific errors for edge case handling."""
339        errors = self.errors()
340        if len(errors) != 1:
341            return None
342
343        error = errors[0]
344        if error and error.get("code") in [400, 403]:
345            error_message = error.get("message", "")
346            if (
347                "does not have bigquery.tables.create permission for dataset"
348                in error_message
349                or "Permission bigquery.tables.create denied" in error_message
350                or "Permission bigquery.datasets.update denied" in error_message
351            ):
352                return Errors.READ_ONLY
353            if "without a filter over column(s)" in error_message:
354                return Errors.DATE_FILTER_NEEDED
355            if (
356                "Syntax error: Expected end of input but got keyword WHERE"
357                in error_message
358            ):
359                return Errors.DATE_FILTER_NEEDED_AND_SYNTAX
360            if (
361                "Permission bigquery.tables.get denied on table" in error_message
362                or "User does not have permission to query table" in error_message
363            ):
364                return Errors.PERMISSION_DENIED
365        return None
DRY_RUN_URL = 'https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/dryrun'
def urlopen_with_retry(request, max_retries=5, initial_delay=2.0, timeout=30):
21def urlopen_with_retry(request, max_retries=5, initial_delay=2.0, timeout=30):
22    """
23    Perform urlopen with exponential backoff retry logic.
24
25    Args:
26        request: The urllib Request object
27        max_retries: Maximum number of retry attempts (default: 5)
28        initial_delay: Initial delay in seconds before first retry (default: 2.0)
29        timeout: Timeout in seconds for each request (default: 30)
30
31    Returns:
32        The response from urlopen
33
34    Raises:
35        URLError: If all retry attempts fail
36    """
37    last_error = None
38    delay = initial_delay
39
40    for attempt in range(max_retries + 1):
41        try:
42            return urlopen(request, timeout=timeout)
43        except URLError as e:
44            last_error = e
45            if attempt < max_retries:
46                print(
47                    f"Network request failed (attempt {attempt + 1}/{max_retries + 1}): {e}"
48                )
49                print(f"Retrying in {delay} seconds...")
50                time.sleep(delay)
51                delay *= 2  # Exponential backoff
52            else:
53                print(f"All {max_retries + 1} attempts failed")
54
55    raise last_error

Perform urlopen with exponential backoff retry logic.

Args: request: The urllib Request object max_retries: Maximum number of retry attempts (default: 5) initial_delay: Initial delay in seconds before first retry (default: 2.0) timeout: Timeout in seconds for each request (default: 30)

Returns: The response from urlopen

Raises: URLError: If all retry attempts fail

def credentials(auth_req: Optional[google.auth.transport.requests.Request] = None):
58def credentials(auth_req: Optional[GoogleAuthRequest] = None):
59    """Get GCP credentials."""
60    auth_req = auth_req or GoogleAuthRequest()
61    creds, _ = google.auth.default(
62        scopes=["https://www.googleapis.com/auth/cloud-platform"]
63    )
64    creds.refresh(auth_req)
65    return creds

Get GCP credentials.

def id_token():
68def id_token():
69    """Get token to authenticate against Cloud Function."""
70    # look for token created by the GitHub Actions workflow
71    id_token = os.environ.get("GOOGLE_GHA_ID_TOKEN")
72
73    if not id_token:
74        auth_req = GoogleAuthRequest()
75        creds = credentials(auth_req)
76        if hasattr(creds, "id_token"):
77            # Get token from default credentials for the current environment created via Cloud SDK run
78            id_token = creds.id_token
79        else:
80            # If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file,
81            # then ID token is acquired using this service account credentials.
82            id_token = fetch_id_token(auth_req, DRY_RUN_URL)
83    return id_token

Get token to authenticate against Cloud Function.

class DryRunError(builtins.Exception):
 86class DryRunError(Exception):
 87    """Exception raised on dry run errors."""
 88
 89    def __init__(self, message, error, use_cloud_function, table_id):
 90        """Initialize DryRunError."""
 91        super().__init__(message)
 92        self.error = error
 93        self.use_cloud_function = use_cloud_function
 94        self.table_id = table_id
 95
 96    def __reduce__(self):
 97        """
 98        Override to ensure that all parameters are being passed when pickling.
 99
100        Pickling happens when passing exception between processes (e.g. via multiprocessing)
101        """
102        return (
103            self.__class__,
104            self.args + (self.error, self.use_cloud_function, self.table_id),
105        )

Exception raised on dry run errors.

DryRunError(message, error, use_cloud_function, table_id)
89    def __init__(self, message, error, use_cloud_function, table_id):
90        """Initialize DryRunError."""
91        super().__init__(message)
92        self.error = error
93        self.use_cloud_function = use_cloud_function
94        self.table_id = table_id

Initialize DryRunError.

error
use_cloud_function
table_id
class Errors(enum.Enum):
108class Errors(Enum):
109    """DryRun errors that require special handling."""
110
111    READ_ONLY = 1
112    DATE_FILTER_NEEDED = 2
113    DATE_FILTER_NEEDED_AND_SYNTAX = 3
114    PERMISSION_DENIED = 4

DryRun errors that require special handling.

READ_ONLY = <Errors.READ_ONLY: 1>
DATE_FILTER_NEEDED = <Errors.DATE_FILTER_NEEDED: 2>
DATE_FILTER_NEEDED_AND_SYNTAX = <Errors.DATE_FILTER_NEEDED_AND_SYNTAX: 3>
PERMISSION_DENIED = <Errors.PERMISSION_DENIED: 4>
class DryRunContext:
117class DryRunContext:
118    """DryRun builder class."""
119
120    def __init__(
121        self,
122        use_cloud_function=False,
123        id_token=None,
124        credentials=None,
125        dry_run_url=DRY_RUN_URL,
126    ):
127        """Initialize dry run instance."""
128        self.use_cloud_function = use_cloud_function
129        self.dry_run_url = dry_run_url
130        self.id_token = id_token
131        self.credentials = credentials
132
133    def create(
134        self,
135        sql=None,
136        project="moz-fx-data-shared-prod",
137        dataset=None,
138        table=None,
139    ):
140        """Initialize a DryRun instance."""
141        return DryRun(
142            use_cloud_function=self.use_cloud_function,
143            id_token=self.id_token,
144            credentials=self.credentials,
145            sql=sql,
146            project=project,
147            dataset=dataset,
148            table=table,
149            dry_run_url=self.dry_run_url,
150        )

DryRun builder class.

DryRunContext( use_cloud_function=False, id_token=None, credentials=None, dry_run_url='https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/dryrun')
120    def __init__(
121        self,
122        use_cloud_function=False,
123        id_token=None,
124        credentials=None,
125        dry_run_url=DRY_RUN_URL,
126    ):
127        """Initialize dry run instance."""
128        self.use_cloud_function = use_cloud_function
129        self.dry_run_url = dry_run_url
130        self.id_token = id_token
131        self.credentials = credentials

Initialize dry run instance.

use_cloud_function
dry_run_url
id_token
credentials
def create( self, sql=None, project='moz-fx-data-shared-prod', dataset=None, table=None):
133    def create(
134        self,
135        sql=None,
136        project="moz-fx-data-shared-prod",
137        dataset=None,
138        table=None,
139    ):
140        """Initialize a DryRun instance."""
141        return DryRun(
142            use_cloud_function=self.use_cloud_function,
143            id_token=self.id_token,
144            credentials=self.credentials,
145            sql=sql,
146            project=project,
147            dataset=dataset,
148            table=table,
149            dry_run_url=self.dry_run_url,
150        )

Initialize a DryRun instance.

class DryRun:
153class DryRun:
154    """Dry run SQL."""
155
156    def __init__(
157        self,
158        use_cloud_function=False,
159        id_token=None,
160        credentials=None,
161        sql=None,
162        project="moz-fx-data-shared-prod",
163        dataset=None,
164        table=None,
165        dry_run_url=DRY_RUN_URL,
166    ):
167        """Initialize dry run instance."""
168        self.sql = sql
169        self.use_cloud_function = use_cloud_function
170        self.project = project
171        self.dataset = dataset
172        self.table = table
173        self.dry_run_url = dry_run_url
174        self.id_token = id_token
175        self.credentials = credentials
176
177    @cached_property
178    def client(self):
179        """Get BigQuery client instance."""
180        return bigquery.Client(credentials=self.credentials)
181
182    @cached_property
183    def dry_run_result(self):
184        """Return the dry run result."""
185        try:
186            if self.use_cloud_function:
187                json_data = {
188                    "query": self.sql or "SELECT 1",
189                    "project": self.project,
190                    "dataset": self.dataset or "telemetry",
191                }
192
193                if self.table:
194                    json_data["table"] = self.table
195
196                request = Request(
197                    self.dry_run_url,
198                    headers={
199                        "Content-Type": "application/json",
200                        "Authorization": f"Bearer {self.id_token}",
201                    },
202                    data=json.dumps(json_data).encode("utf8"),
203                    method="POST",
204                )
205                r = urlopen_with_retry(request)
206                return json.load(r)
207            else:
208                query_schema = None
209                referenced_tables = []
210                table_metadata = None
211
212                if self.sql:
213                    job_config = bigquery.QueryJobConfig(
214                        dry_run=True,
215                        use_query_cache=False,
216                        query_parameters=[
217                            bigquery.ScalarQueryParameter(
218                                "submission_date", "DATE", "2019-01-01"
219                            )
220                        ],
221                    )
222
223                    if self.project:
224                        job_config.connection_properties = [
225                            bigquery.ConnectionProperty(
226                                "dataset_project_id", self.project
227                            )
228                        ]
229
230                    job = self.client.query(self.sql, job_config=job_config)
231                    query_schema = (
232                        job._properties.get("statistics", {})
233                        .get("query", {})
234                        .get("schema", {})
235                    )
236                    referenced_tables = [
237                        ref.to_api_repr() for ref in job.referenced_tables
238                    ]
239
240                if (
241                    self.project is not None
242                    and self.table is not None
243                    and self.dataset is not None
244                ):
245                    table = self.client.get_table(
246                        f"{self.project}.{self.dataset}.{self.table}"
247                    )
248                    table_metadata = {
249                        "tableType": table.table_type,
250                        "friendlyName": table.friendly_name,
251                        "schema": {
252                            "fields": [field.to_api_repr() for field in table.schema]
253                        },
254                    }
255
256                return {
257                    "valid": True,
258                    "referencedTables": referenced_tables,
259                    "schema": query_schema,
260                    "tableMetadata": table_metadata,
261                }
262        except Exception as e:
263            print(f"ERROR {e}")
264            return None
265
266    def get_schema(self):
267        """Return the query schema by dry running the SQL file."""
268        self.validate()
269
270        if (
271            self.dry_run_result
272            and self.dry_run_result["valid"]
273            and "schema" in self.dry_run_result
274        ):
275            return self.dry_run_result["schema"]["fields"]
276
277        return []
278
279    def get_table_schema(self):
280        """Return the schema of the provided table."""
281        self.validate()
282
283        if (
284            self.dry_run_result
285            and self.dry_run_result["valid"]
286            and "tableMetadata" in self.dry_run_result
287        ):
288            return self.dry_run_result["tableMetadata"]["schema"]["fields"]
289
290        return []
291
292    def get_table_metadata(self):
293        """Return table metadata."""
294        self.validate()
295
296        if (
297            self.dry_run_result
298            and self.dry_run_result["valid"]
299            and "tableMetadata" in self.dry_run_result
300        ):
301            return self.dry_run_result["tableMetadata"]
302
303        return {}
304
305    def validate(self):
306        """Dry run the provided SQL file and check if valid."""
307        dry_run_error = DryRunError(
308            "Error when dry running SQL",
309            self.get_error(),
310            self.use_cloud_function,
311            self.table,
312        )
313
314        if self.dry_run_result is None:
315            raise dry_run_error
316
317        if self.dry_run_result["valid"]:
318            return True
319        elif self.get_error() == Errors.READ_ONLY:
320            # We want the dryrun service to only have read permissions, so
321            # we expect CREATE VIEW and CREATE TABLE to throw specific
322            # exceptions.
323            return True
324        elif self.get_error() == Errors.DATE_FILTER_NEEDED:
325            # With strip_dml flag, some queries require a partition filter
326            # (submission_date, submission_timestamp, etc.) to run
327            return True
328        else:
329            print("ERROR\n", self.dry_run_result["errors"])
330            raise dry_run_error
331
332    def errors(self):
333        """Dry run the provided SQL file and return errors."""
334        if self.dry_run_result is None:
335            return []
336        return self.dry_run_result.get("errors", [])
337
338    def get_error(self) -> Optional[Errors]:
339        """Get specific errors for edge case handling."""
340        errors = self.errors()
341        if len(errors) != 1:
342            return None
343
344        error = errors[0]
345        if error and error.get("code") in [400, 403]:
346            error_message = error.get("message", "")
347            if (
348                "does not have bigquery.tables.create permission for dataset"
349                in error_message
350                or "Permission bigquery.tables.create denied" in error_message
351                or "Permission bigquery.datasets.update denied" in error_message
352            ):
353                return Errors.READ_ONLY
354            if "without a filter over column(s)" in error_message:
355                return Errors.DATE_FILTER_NEEDED
356            if (
357                "Syntax error: Expected end of input but got keyword WHERE"
358                in error_message
359            ):
360                return Errors.DATE_FILTER_NEEDED_AND_SYNTAX
361            if (
362                "Permission bigquery.tables.get denied on table" in error_message
363                or "User does not have permission to query table" in error_message
364            ):
365                return Errors.PERMISSION_DENIED
366        return None

Dry run SQL.

DryRun( use_cloud_function=False, id_token=None, credentials=None, sql=None, project='moz-fx-data-shared-prod', dataset=None, table=None, dry_run_url='https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/dryrun')
156    def __init__(
157        self,
158        use_cloud_function=False,
159        id_token=None,
160        credentials=None,
161        sql=None,
162        project="moz-fx-data-shared-prod",
163        dataset=None,
164        table=None,
165        dry_run_url=DRY_RUN_URL,
166    ):
167        """Initialize dry run instance."""
168        self.sql = sql
169        self.use_cloud_function = use_cloud_function
170        self.project = project
171        self.dataset = dataset
172        self.table = table
173        self.dry_run_url = dry_run_url
174        self.id_token = id_token
175        self.credentials = credentials

Initialize dry run instance.

sql
use_cloud_function
project
dataset
table
dry_run_url
id_token
credentials
client
177    @cached_property
178    def client(self):
179        """Get BigQuery client instance."""
180        return bigquery.Client(credentials=self.credentials)

Get BigQuery client instance.

dry_run_result
182    @cached_property
183    def dry_run_result(self):
184        """Return the dry run result."""
185        try:
186            if self.use_cloud_function:
187                json_data = {
188                    "query": self.sql or "SELECT 1",
189                    "project": self.project,
190                    "dataset": self.dataset or "telemetry",
191                }
192
193                if self.table:
194                    json_data["table"] = self.table
195
196                request = Request(
197                    self.dry_run_url,
198                    headers={
199                        "Content-Type": "application/json",
200                        "Authorization": f"Bearer {self.id_token}",
201                    },
202                    data=json.dumps(json_data).encode("utf8"),
203                    method="POST",
204                )
205                r = urlopen_with_retry(request)
206                return json.load(r)
207            else:
208                query_schema = None
209                referenced_tables = []
210                table_metadata = None
211
212                if self.sql:
213                    job_config = bigquery.QueryJobConfig(
214                        dry_run=True,
215                        use_query_cache=False,
216                        query_parameters=[
217                            bigquery.ScalarQueryParameter(
218                                "submission_date", "DATE", "2019-01-01"
219                            )
220                        ],
221                    )
222
223                    if self.project:
224                        job_config.connection_properties = [
225                            bigquery.ConnectionProperty(
226                                "dataset_project_id", self.project
227                            )
228                        ]
229
230                    job = self.client.query(self.sql, job_config=job_config)
231                    query_schema = (
232                        job._properties.get("statistics", {})
233                        .get("query", {})
234                        .get("schema", {})
235                    )
236                    referenced_tables = [
237                        ref.to_api_repr() for ref in job.referenced_tables
238                    ]
239
240                if (
241                    self.project is not None
242                    and self.table is not None
243                    and self.dataset is not None
244                ):
245                    table = self.client.get_table(
246                        f"{self.project}.{self.dataset}.{self.table}"
247                    )
248                    table_metadata = {
249                        "tableType": table.table_type,
250                        "friendlyName": table.friendly_name,
251                        "schema": {
252                            "fields": [field.to_api_repr() for field in table.schema]
253                        },
254                    }
255
256                return {
257                    "valid": True,
258                    "referencedTables": referenced_tables,
259                    "schema": query_schema,
260                    "tableMetadata": table_metadata,
261                }
262        except Exception as e:
263            print(f"ERROR {e}")
264            return None

Return the dry run result.

def get_schema(self):
266    def get_schema(self):
267        """Return the query schema by dry running the SQL file."""
268        self.validate()
269
270        if (
271            self.dry_run_result
272            and self.dry_run_result["valid"]
273            and "schema" in self.dry_run_result
274        ):
275            return self.dry_run_result["schema"]["fields"]
276
277        return []

Return the query schema by dry running the SQL file.

def get_table_schema(self):
279    def get_table_schema(self):
280        """Return the schema of the provided table."""
281        self.validate()
282
283        if (
284            self.dry_run_result
285            and self.dry_run_result["valid"]
286            and "tableMetadata" in self.dry_run_result
287        ):
288            return self.dry_run_result["tableMetadata"]["schema"]["fields"]
289
290        return []

Return the schema of the provided table.

def get_table_metadata(self):
292    def get_table_metadata(self):
293        """Return table metadata."""
294        self.validate()
295
296        if (
297            self.dry_run_result
298            and self.dry_run_result["valid"]
299            and "tableMetadata" in self.dry_run_result
300        ):
301            return self.dry_run_result["tableMetadata"]
302
303        return {}

Return table metadata.

def validate(self):
305    def validate(self):
306        """Dry run the provided SQL file and check if valid."""
307        dry_run_error = DryRunError(
308            "Error when dry running SQL",
309            self.get_error(),
310            self.use_cloud_function,
311            self.table,
312        )
313
314        if self.dry_run_result is None:
315            raise dry_run_error
316
317        if self.dry_run_result["valid"]:
318            return True
319        elif self.get_error() == Errors.READ_ONLY:
320            # We want the dryrun service to only have read permissions, so
321            # we expect CREATE VIEW and CREATE TABLE to throw specific
322            # exceptions.
323            return True
324        elif self.get_error() == Errors.DATE_FILTER_NEEDED:
325            # With strip_dml flag, some queries require a partition filter
326            # (submission_date, submission_timestamp, etc.) to run
327            return True
328        else:
329            print("ERROR\n", self.dry_run_result["errors"])
330            raise dry_run_error

Dry run the provided SQL file and check if valid.

def errors(self):
332    def errors(self):
333        """Dry run the provided SQL file and return errors."""
334        if self.dry_run_result is None:
335            return []
336        return self.dry_run_result.get("errors", [])

Dry run the provided SQL file and return errors.

def get_error(self) -> Optional[Errors]:
338    def get_error(self) -> Optional[Errors]:
339        """Get specific errors for edge case handling."""
340        errors = self.errors()
341        if len(errors) != 1:
342            return None
343
344        error = errors[0]
345        if error and error.get("code") in [400, 403]:
346            error_message = error.get("message", "")
347            if (
348                "does not have bigquery.tables.create permission for dataset"
349                in error_message
350                or "Permission bigquery.tables.create denied" in error_message
351                or "Permission bigquery.datasets.update denied" in error_message
352            ):
353                return Errors.READ_ONLY
354            if "without a filter over column(s)" in error_message:
355                return Errors.DATE_FILTER_NEEDED
356            if (
357                "Syntax error: Expected end of input but got keyword WHERE"
358                in error_message
359            ):
360                return Errors.DATE_FILTER_NEEDED_AND_SYNTAX
361            if (
362                "Permission bigquery.tables.get denied on table" in error_message
363                or "User does not have permission to query table" in error_message
364            ):
365                return Errors.PERMISSION_DENIED
366        return None

Get specific errors for edge case handling.