generator.dryrun

Dry Run method to get BigQuery metadata.

  1"""Dry Run method to get BigQuery metadata."""
  2
  3import json
  4import os
  5import time
  6from enum import Enum
  7from functools import cached_property
  8from typing import Optional
  9from urllib.error import URLError
 10from urllib.request import Request, urlopen
 11
 12import google.auth
 13from google.auth.transport.requests import Request as GoogleAuthRequest
 14from google.cloud import bigquery
 15from google.oauth2.id_token import fetch_id_token
 16
 17DRY_RUN_URL = (
 18    "https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun"
 19)
 20
 21
 22def urlopen_with_retry(request, max_retries=5, initial_delay=2.0, timeout=30):
 23    """
 24    Perform urlopen with exponential backoff retry logic.
 25
 26    Args:
 27        request: The urllib Request object
 28        max_retries: Maximum number of retry attempts (default: 5)
 29        initial_delay: Initial delay in seconds before first retry (default: 2.0)
 30        timeout: Timeout in seconds for each request (default: 30)
 31
 32    Returns:
 33        The response from urlopen
 34
 35    Raises:
 36        URLError: If all retry attempts fail
 37    """
 38    last_error = None
 39    delay = initial_delay
 40
 41    for attempt in range(max_retries + 1):
 42        try:
 43            return urlopen(request, timeout=timeout)
 44        except URLError as e:
 45            last_error = e
 46            if attempt < max_retries:
 47                print(
 48                    f"Network request failed (attempt {attempt + 1}/{max_retries + 1}): {e}"
 49                )
 50                print(f"Retrying in {delay} seconds...")
 51                time.sleep(delay)
 52                delay *= 2  # Exponential backoff
 53            else:
 54                print(f"All {max_retries + 1} attempts failed")
 55
 56    raise last_error
 57
 58
 59def credentials(auth_req: Optional[GoogleAuthRequest] = None):
 60    """Get GCP credentials."""
 61    auth_req = auth_req or GoogleAuthRequest()
 62    creds, _ = google.auth.default(
 63        scopes=["https://www.googleapis.com/auth/cloud-platform"]
 64    )
 65    creds.refresh(auth_req)
 66    return creds
 67
 68
 69def id_token():
 70    """Get token to authenticate against Cloud Function."""
 71    # look for token created by the GitHub Actions workflow
 72    id_token = os.environ.get("GOOGLE_GHA_ID_TOKEN")
 73
 74    if not id_token:
 75        auth_req = GoogleAuthRequest()
 76        creds = credentials(auth_req)
 77        if hasattr(creds, "id_token"):
 78            # Get token from default credentials for the current environment created via Cloud SDK run
 79            id_token = creds.id_token
 80        else:
 81            # If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file,
 82            # then ID token is acquired using this service account credentials.
 83            id_token = fetch_id_token(auth_req, DRY_RUN_URL)
 84    return id_token
 85
 86
 87class DryRunError(Exception):
 88    """Exception raised on dry run errors."""
 89
 90    def __init__(self, message, error, use_cloud_function, table_id):
 91        """Initialize DryRunError."""
 92        super().__init__(message)
 93        self.error = error
 94        self.use_cloud_function = use_cloud_function
 95        self.table_id = table_id
 96
 97    def __reduce__(self):
 98        """
 99        Override to ensure that all parameters are being passed when pickling.
100
101        Pickling happens when passing exception between processes (e.g. via multiprocessing)
102        """
103        return (
104            self.__class__,
105            self.args + (self.error, self.use_cloud_function, self.table_id),
106        )
107
108
109class Errors(Enum):
110    """DryRun errors that require special handling."""
111
112    READ_ONLY = 1
113    DATE_FILTER_NEEDED = 2
114    DATE_FILTER_NEEDED_AND_SYNTAX = 3
115    PERMISSION_DENIED = 4
116
117
118class DryRunContext:
119    """DryRun builder class."""
120
121    def __init__(
122        self,
123        use_cloud_function=False,
124        id_token=None,
125        credentials=None,
126        dry_run_url=DRY_RUN_URL,
127    ):
128        """Initialize dry run instance."""
129        self.use_cloud_function = use_cloud_function
130        self.dry_run_url = dry_run_url
131        self.id_token = id_token
132        self.credentials = credentials
133
134    def create(
135        self,
136        sql=None,
137        project="moz-fx-data-shared-prod",
138        dataset=None,
139        table=None,
140    ):
141        """Initialize a DryRun instance."""
142        return DryRun(
143            use_cloud_function=self.use_cloud_function,
144            id_token=self.id_token,
145            credentials=self.credentials,
146            sql=sql,
147            project=project,
148            dataset=dataset,
149            table=table,
150            dry_run_url=self.dry_run_url,
151        )
152
153
154class DryRun:
155    """Dry run SQL."""
156
157    def __init__(
158        self,
159        use_cloud_function=False,
160        id_token=None,
161        credentials=None,
162        sql=None,
163        project="moz-fx-data-shared-prod",
164        dataset=None,
165        table=None,
166        dry_run_url=DRY_RUN_URL,
167    ):
168        """Initialize dry run instance."""
169        self.sql = sql
170        self.use_cloud_function = use_cloud_function
171        self.project = project
172        self.dataset = dataset
173        self.table = table
174        self.dry_run_url = dry_run_url
175        self.id_token = id_token
176        self.credentials = credentials
177
178    @cached_property
179    def client(self):
180        """Get BigQuery client instance."""
181        return bigquery.Client(credentials=self.credentials)
182
183    @cached_property
184    def dry_run_result(self):
185        """Return the dry run result."""
186        try:
187            if self.use_cloud_function:
188                json_data = {
189                    "query": self.sql or "SELECT 1",
190                    "project": self.project,
191                    "dataset": self.dataset or "telemetry",
192                }
193
194                if self.table:
195                    json_data["table"] = self.table
196
197                request = Request(
198                    self.dry_run_url,
199                    headers={
200                        "Content-Type": "application/json",
201                        "Authorization": f"Bearer {self.id_token}",
202                    },
203                    data=json.dumps(json_data).encode("utf8"),
204                    method="POST",
205                )
206                r = urlopen_with_retry(request)
207                return json.load(r)
208            else:
209                query_schema = None
210                referenced_tables = []
211                table_metadata = None
212
213                if self.sql:
214                    job_config = bigquery.QueryJobConfig(
215                        dry_run=True,
216                        use_query_cache=False,
217                        query_parameters=[
218                            bigquery.ScalarQueryParameter(
219                                "submission_date", "DATE", "2019-01-01"
220                            )
221                        ],
222                    )
223
224                    if self.project:
225                        job_config.connection_properties = [
226                            bigquery.ConnectionProperty(
227                                "dataset_project_id", self.project
228                            )
229                        ]
230
231                    job = self.client.query(self.sql, job_config=job_config)
232                    query_schema = (
233                        job._properties.get("statistics", {})
234                        .get("query", {})
235                        .get("schema", {})
236                    )
237                    referenced_tables = [
238                        ref.to_api_repr() for ref in job.referenced_tables
239                    ]
240
241                if (
242                    self.project is not None
243                    and self.table is not None
244                    and self.dataset is not None
245                ):
246                    table = self.client.get_table(
247                        f"{self.project}.{self.dataset}.{self.table}"
248                    )
249                    table_metadata = {
250                        "tableType": table.table_type,
251                        "friendlyName": table.friendly_name,
252                        "schema": {
253                            "fields": [field.to_api_repr() for field in table.schema]
254                        },
255                    }
256
257                return {
258                    "valid": True,
259                    "referencedTables": referenced_tables,
260                    "schema": query_schema,
261                    "tableMetadata": table_metadata,
262                }
263        except Exception as e:
264            print(f"ERROR {e}")
265            return None
266
267    def get_schema(self):
268        """Return the query schema by dry running the SQL file."""
269        self.validate()
270
271        if (
272            self.dry_run_result
273            and self.dry_run_result["valid"]
274            and "schema" in self.dry_run_result
275        ):
276            return self.dry_run_result["schema"]["fields"]
277
278        return []
279
280    def get_table_schema(self):
281        """Return the schema of the provided table."""
282        self.validate()
283
284        if (
285            self.dry_run_result
286            and self.dry_run_result["valid"]
287            and "tableMetadata" in self.dry_run_result
288        ):
289            return self.dry_run_result["tableMetadata"]["schema"]["fields"]
290
291        return []
292
293    def get_table_metadata(self):
294        """Return table metadata."""
295        self.validate()
296
297        if (
298            self.dry_run_result
299            and self.dry_run_result["valid"]
300            and "tableMetadata" in self.dry_run_result
301        ):
302            return self.dry_run_result["tableMetadata"]
303
304        return {}
305
306    def validate(self):
307        """Dry run the provided SQL file and check if valid."""
308        dry_run_error = DryRunError(
309            "Error when dry running SQL",
310            self.get_error(),
311            self.use_cloud_function,
312            self.table,
313        )
314
315        if self.dry_run_result is None:
316            raise dry_run_error
317
318        if self.dry_run_result["valid"]:
319            return True
320        elif self.get_error() == Errors.READ_ONLY:
321            # We want the dryrun service to only have read permissions, so
322            # we expect CREATE VIEW and CREATE TABLE to throw specific
323            # exceptions.
324            return True
325        elif self.get_error() == Errors.DATE_FILTER_NEEDED:
326            # With strip_dml flag, some queries require a partition filter
327            # (submission_date, submission_timestamp, etc.) to run
328            return True
329        else:
330            print("ERROR\n", self.dry_run_result["errors"])
331            raise dry_run_error
332
333    def errors(self):
334        """Dry run the provided SQL file and return errors."""
335        if self.dry_run_result is None:
336            return []
337        return self.dry_run_result.get("errors", [])
338
339    def get_error(self) -> Optional[Errors]:
340        """Get specific errors for edge case handling."""
341        errors = self.errors()
342        if len(errors) != 1:
343            return None
344
345        error = errors[0]
346        if error and error.get("code") in [400, 403]:
347            error_message = error.get("message", "")
348            if (
349                "does not have bigquery.tables.create permission for dataset"
350                in error_message
351                or "Permission bigquery.tables.create denied" in error_message
352                or "Permission bigquery.datasets.update denied" in error_message
353            ):
354                return Errors.READ_ONLY
355            if "without a filter over column(s)" in error_message:
356                return Errors.DATE_FILTER_NEEDED
357            if (
358                "Syntax error: Expected end of input but got keyword WHERE"
359                in error_message
360            ):
361                return Errors.DATE_FILTER_NEEDED_AND_SYNTAX
362            if (
363                "Permission bigquery.tables.get denied on table" in error_message
364                or "User does not have permission to query table" in error_message
365            ):
366                return Errors.PERMISSION_DENIED
367        return None
DRY_RUN_URL = 'https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun'
def urlopen_with_retry(request, max_retries=5, initial_delay=2.0, timeout=30):
23def urlopen_with_retry(request, max_retries=5, initial_delay=2.0, timeout=30):
24    """
25    Perform urlopen with exponential backoff retry logic.
26
27    Args:
28        request: The urllib Request object
29        max_retries: Maximum number of retry attempts (default: 5)
30        initial_delay: Initial delay in seconds before first retry (default: 2.0)
31        timeout: Timeout in seconds for each request (default: 30)
32
33    Returns:
34        The response from urlopen
35
36    Raises:
37        URLError: If all retry attempts fail
38    """
39    last_error = None
40    delay = initial_delay
41
42    for attempt in range(max_retries + 1):
43        try:
44            return urlopen(request, timeout=timeout)
45        except URLError as e:
46            last_error = e
47            if attempt < max_retries:
48                print(
49                    f"Network request failed (attempt {attempt + 1}/{max_retries + 1}): {e}"
50                )
51                print(f"Retrying in {delay} seconds...")
52                time.sleep(delay)
53                delay *= 2  # Exponential backoff
54            else:
55                print(f"All {max_retries + 1} attempts failed")
56
57    raise last_error

Perform urlopen with exponential backoff retry logic.

Args: request: The urllib Request object max_retries: Maximum number of retry attempts (default: 5) initial_delay: Initial delay in seconds before first retry (default: 2.0) timeout: Timeout in seconds for each request (default: 30)

Returns: The response from urlopen

Raises: URLError: If all retry attempts fail

def credentials(auth_req: Optional[google.auth.transport.requests.Request] = None):
60def credentials(auth_req: Optional[GoogleAuthRequest] = None):
61    """Get GCP credentials."""
62    auth_req = auth_req or GoogleAuthRequest()
63    creds, _ = google.auth.default(
64        scopes=["https://www.googleapis.com/auth/cloud-platform"]
65    )
66    creds.refresh(auth_req)
67    return creds

Get GCP credentials.

def id_token():
70def id_token():
71    """Get token to authenticate against Cloud Function."""
72    # look for token created by the GitHub Actions workflow
73    id_token = os.environ.get("GOOGLE_GHA_ID_TOKEN")
74
75    if not id_token:
76        auth_req = GoogleAuthRequest()
77        creds = credentials(auth_req)
78        if hasattr(creds, "id_token"):
79            # Get token from default credentials for the current environment created via Cloud SDK run
80            id_token = creds.id_token
81        else:
82            # If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file,
83            # then ID token is acquired using this service account credentials.
84            id_token = fetch_id_token(auth_req, DRY_RUN_URL)
85    return id_token

Get token to authenticate against Cloud Function.

class DryRunError(builtins.Exception):
 88class DryRunError(Exception):
 89    """Exception raised on dry run errors."""
 90
 91    def __init__(self, message, error, use_cloud_function, table_id):
 92        """Initialize DryRunError."""
 93        super().__init__(message)
 94        self.error = error
 95        self.use_cloud_function = use_cloud_function
 96        self.table_id = table_id
 97
 98    def __reduce__(self):
 99        """
100        Override to ensure that all parameters are being passed when pickling.
101
102        Pickling happens when passing exception between processes (e.g. via multiprocessing)
103        """
104        return (
105            self.__class__,
106            self.args + (self.error, self.use_cloud_function, self.table_id),
107        )

Exception raised on dry run errors.

DryRunError(message, error, use_cloud_function, table_id)
91    def __init__(self, message, error, use_cloud_function, table_id):
92        """Initialize DryRunError."""
93        super().__init__(message)
94        self.error = error
95        self.use_cloud_function = use_cloud_function
96        self.table_id = table_id

Initialize DryRunError.

error
use_cloud_function
table_id
class Errors(enum.Enum):
110class Errors(Enum):
111    """DryRun errors that require special handling."""
112
113    READ_ONLY = 1
114    DATE_FILTER_NEEDED = 2
115    DATE_FILTER_NEEDED_AND_SYNTAX = 3
116    PERMISSION_DENIED = 4

DryRun errors that require special handling.

READ_ONLY = <Errors.READ_ONLY: 1>
DATE_FILTER_NEEDED = <Errors.DATE_FILTER_NEEDED: 2>
DATE_FILTER_NEEDED_AND_SYNTAX = <Errors.DATE_FILTER_NEEDED_AND_SYNTAX: 3>
PERMISSION_DENIED = <Errors.PERMISSION_DENIED: 4>
class DryRunContext:
119class DryRunContext:
120    """DryRun builder class."""
121
122    def __init__(
123        self,
124        use_cloud_function=False,
125        id_token=None,
126        credentials=None,
127        dry_run_url=DRY_RUN_URL,
128    ):
129        """Initialize dry run instance."""
130        self.use_cloud_function = use_cloud_function
131        self.dry_run_url = dry_run_url
132        self.id_token = id_token
133        self.credentials = credentials
134
135    def create(
136        self,
137        sql=None,
138        project="moz-fx-data-shared-prod",
139        dataset=None,
140        table=None,
141    ):
142        """Initialize a DryRun instance."""
143        return DryRun(
144            use_cloud_function=self.use_cloud_function,
145            id_token=self.id_token,
146            credentials=self.credentials,
147            sql=sql,
148            project=project,
149            dataset=dataset,
150            table=table,
151            dry_run_url=self.dry_run_url,
152        )

DryRun builder class.

DryRunContext( use_cloud_function=False, id_token=None, credentials=None, dry_run_url='https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun')
122    def __init__(
123        self,
124        use_cloud_function=False,
125        id_token=None,
126        credentials=None,
127        dry_run_url=DRY_RUN_URL,
128    ):
129        """Initialize dry run instance."""
130        self.use_cloud_function = use_cloud_function
131        self.dry_run_url = dry_run_url
132        self.id_token = id_token
133        self.credentials = credentials

Initialize dry run instance.

use_cloud_function
dry_run_url
id_token
credentials
def create( self, sql=None, project='moz-fx-data-shared-prod', dataset=None, table=None):
135    def create(
136        self,
137        sql=None,
138        project="moz-fx-data-shared-prod",
139        dataset=None,
140        table=None,
141    ):
142        """Initialize a DryRun instance."""
143        return DryRun(
144            use_cloud_function=self.use_cloud_function,
145            id_token=self.id_token,
146            credentials=self.credentials,
147            sql=sql,
148            project=project,
149            dataset=dataset,
150            table=table,
151            dry_run_url=self.dry_run_url,
152        )

Initialize a DryRun instance.

class DryRun:
155class DryRun:
156    """Dry run SQL."""
157
158    def __init__(
159        self,
160        use_cloud_function=False,
161        id_token=None,
162        credentials=None,
163        sql=None,
164        project="moz-fx-data-shared-prod",
165        dataset=None,
166        table=None,
167        dry_run_url=DRY_RUN_URL,
168    ):
169        """Initialize dry run instance."""
170        self.sql = sql
171        self.use_cloud_function = use_cloud_function
172        self.project = project
173        self.dataset = dataset
174        self.table = table
175        self.dry_run_url = dry_run_url
176        self.id_token = id_token
177        self.credentials = credentials
178
179    @cached_property
180    def client(self):
181        """Get BigQuery client instance."""
182        return bigquery.Client(credentials=self.credentials)
183
184    @cached_property
185    def dry_run_result(self):
186        """Return the dry run result."""
187        try:
188            if self.use_cloud_function:
189                json_data = {
190                    "query": self.sql or "SELECT 1",
191                    "project": self.project,
192                    "dataset": self.dataset or "telemetry",
193                }
194
195                if self.table:
196                    json_data["table"] = self.table
197
198                request = Request(
199                    self.dry_run_url,
200                    headers={
201                        "Content-Type": "application/json",
202                        "Authorization": f"Bearer {self.id_token}",
203                    },
204                    data=json.dumps(json_data).encode("utf8"),
205                    method="POST",
206                )
207                r = urlopen_with_retry(request)
208                return json.load(r)
209            else:
210                query_schema = None
211                referenced_tables = []
212                table_metadata = None
213
214                if self.sql:
215                    job_config = bigquery.QueryJobConfig(
216                        dry_run=True,
217                        use_query_cache=False,
218                        query_parameters=[
219                            bigquery.ScalarQueryParameter(
220                                "submission_date", "DATE", "2019-01-01"
221                            )
222                        ],
223                    )
224
225                    if self.project:
226                        job_config.connection_properties = [
227                            bigquery.ConnectionProperty(
228                                "dataset_project_id", self.project
229                            )
230                        ]
231
232                    job = self.client.query(self.sql, job_config=job_config)
233                    query_schema = (
234                        job._properties.get("statistics", {})
235                        .get("query", {})
236                        .get("schema", {})
237                    )
238                    referenced_tables = [
239                        ref.to_api_repr() for ref in job.referenced_tables
240                    ]
241
242                if (
243                    self.project is not None
244                    and self.table is not None
245                    and self.dataset is not None
246                ):
247                    table = self.client.get_table(
248                        f"{self.project}.{self.dataset}.{self.table}"
249                    )
250                    table_metadata = {
251                        "tableType": table.table_type,
252                        "friendlyName": table.friendly_name,
253                        "schema": {
254                            "fields": [field.to_api_repr() for field in table.schema]
255                        },
256                    }
257
258                return {
259                    "valid": True,
260                    "referencedTables": referenced_tables,
261                    "schema": query_schema,
262                    "tableMetadata": table_metadata,
263                }
264        except Exception as e:
265            print(f"ERROR {e}")
266            return None
267
268    def get_schema(self):
269        """Return the query schema by dry running the SQL file."""
270        self.validate()
271
272        if (
273            self.dry_run_result
274            and self.dry_run_result["valid"]
275            and "schema" in self.dry_run_result
276        ):
277            return self.dry_run_result["schema"]["fields"]
278
279        return []
280
281    def get_table_schema(self):
282        """Return the schema of the provided table."""
283        self.validate()
284
285        if (
286            self.dry_run_result
287            and self.dry_run_result["valid"]
288            and "tableMetadata" in self.dry_run_result
289        ):
290            return self.dry_run_result["tableMetadata"]["schema"]["fields"]
291
292        return []
293
294    def get_table_metadata(self):
295        """Return table metadata."""
296        self.validate()
297
298        if (
299            self.dry_run_result
300            and self.dry_run_result["valid"]
301            and "tableMetadata" in self.dry_run_result
302        ):
303            return self.dry_run_result["tableMetadata"]
304
305        return {}
306
307    def validate(self):
308        """Dry run the provided SQL file and check if valid."""
309        dry_run_error = DryRunError(
310            "Error when dry running SQL",
311            self.get_error(),
312            self.use_cloud_function,
313            self.table,
314        )
315
316        if self.dry_run_result is None:
317            raise dry_run_error
318
319        if self.dry_run_result["valid"]:
320            return True
321        elif self.get_error() == Errors.READ_ONLY:
322            # We want the dryrun service to only have read permissions, so
323            # we expect CREATE VIEW and CREATE TABLE to throw specific
324            # exceptions.
325            return True
326        elif self.get_error() == Errors.DATE_FILTER_NEEDED:
327            # With strip_dml flag, some queries require a partition filter
328            # (submission_date, submission_timestamp, etc.) to run
329            return True
330        else:
331            print("ERROR\n", self.dry_run_result["errors"])
332            raise dry_run_error
333
334    def errors(self):
335        """Dry run the provided SQL file and return errors."""
336        if self.dry_run_result is None:
337            return []
338        return self.dry_run_result.get("errors", [])
339
340    def get_error(self) -> Optional[Errors]:
341        """Get specific errors for edge case handling."""
342        errors = self.errors()
343        if len(errors) != 1:
344            return None
345
346        error = errors[0]
347        if error and error.get("code") in [400, 403]:
348            error_message = error.get("message", "")
349            if (
350                "does not have bigquery.tables.create permission for dataset"
351                in error_message
352                or "Permission bigquery.tables.create denied" in error_message
353                or "Permission bigquery.datasets.update denied" in error_message
354            ):
355                return Errors.READ_ONLY
356            if "without a filter over column(s)" in error_message:
357                return Errors.DATE_FILTER_NEEDED
358            if (
359                "Syntax error: Expected end of input but got keyword WHERE"
360                in error_message
361            ):
362                return Errors.DATE_FILTER_NEEDED_AND_SYNTAX
363            if (
364                "Permission bigquery.tables.get denied on table" in error_message
365                or "User does not have permission to query table" in error_message
366            ):
367                return Errors.PERMISSION_DENIED
368        return None

Dry run SQL.

DryRun( use_cloud_function=False, id_token=None, credentials=None, sql=None, project='moz-fx-data-shared-prod', dataset=None, table=None, dry_run_url='https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun')
158    def __init__(
159        self,
160        use_cloud_function=False,
161        id_token=None,
162        credentials=None,
163        sql=None,
164        project="moz-fx-data-shared-prod",
165        dataset=None,
166        table=None,
167        dry_run_url=DRY_RUN_URL,
168    ):
169        """Initialize dry run instance."""
170        self.sql = sql
171        self.use_cloud_function = use_cloud_function
172        self.project = project
173        self.dataset = dataset
174        self.table = table
175        self.dry_run_url = dry_run_url
176        self.id_token = id_token
177        self.credentials = credentials

Initialize dry run instance.

sql
use_cloud_function
project
dataset
table
dry_run_url
id_token
credentials
client
179    @cached_property
180    def client(self):
181        """Get BigQuery client instance."""
182        return bigquery.Client(credentials=self.credentials)

Get BigQuery client instance.

dry_run_result
184    @cached_property
185    def dry_run_result(self):
186        """Return the dry run result."""
187        try:
188            if self.use_cloud_function:
189                json_data = {
190                    "query": self.sql or "SELECT 1",
191                    "project": self.project,
192                    "dataset": self.dataset or "telemetry",
193                }
194
195                if self.table:
196                    json_data["table"] = self.table
197
198                request = Request(
199                    self.dry_run_url,
200                    headers={
201                        "Content-Type": "application/json",
202                        "Authorization": f"Bearer {self.id_token}",
203                    },
204                    data=json.dumps(json_data).encode("utf8"),
205                    method="POST",
206                )
207                r = urlopen_with_retry(request)
208                return json.load(r)
209            else:
210                query_schema = None
211                referenced_tables = []
212                table_metadata = None
213
214                if self.sql:
215                    job_config = bigquery.QueryJobConfig(
216                        dry_run=True,
217                        use_query_cache=False,
218                        query_parameters=[
219                            bigquery.ScalarQueryParameter(
220                                "submission_date", "DATE", "2019-01-01"
221                            )
222                        ],
223                    )
224
225                    if self.project:
226                        job_config.connection_properties = [
227                            bigquery.ConnectionProperty(
228                                "dataset_project_id", self.project
229                            )
230                        ]
231
232                    job = self.client.query(self.sql, job_config=job_config)
233                    query_schema = (
234                        job._properties.get("statistics", {})
235                        .get("query", {})
236                        .get("schema", {})
237                    )
238                    referenced_tables = [
239                        ref.to_api_repr() for ref in job.referenced_tables
240                    ]
241
242                if (
243                    self.project is not None
244                    and self.table is not None
245                    and self.dataset is not None
246                ):
247                    table = self.client.get_table(
248                        f"{self.project}.{self.dataset}.{self.table}"
249                    )
250                    table_metadata = {
251                        "tableType": table.table_type,
252                        "friendlyName": table.friendly_name,
253                        "schema": {
254                            "fields": [field.to_api_repr() for field in table.schema]
255                        },
256                    }
257
258                return {
259                    "valid": True,
260                    "referencedTables": referenced_tables,
261                    "schema": query_schema,
262                    "tableMetadata": table_metadata,
263                }
264        except Exception as e:
265            print(f"ERROR {e}")
266            return None

Return the dry run result.

def get_schema(self):
268    def get_schema(self):
269        """Return the query schema by dry running the SQL file."""
270        self.validate()
271
272        if (
273            self.dry_run_result
274            and self.dry_run_result["valid"]
275            and "schema" in self.dry_run_result
276        ):
277            return self.dry_run_result["schema"]["fields"]
278
279        return []

Return the query schema by dry running the SQL file.

def get_table_schema(self):
281    def get_table_schema(self):
282        """Return the schema of the provided table."""
283        self.validate()
284
285        if (
286            self.dry_run_result
287            and self.dry_run_result["valid"]
288            and "tableMetadata" in self.dry_run_result
289        ):
290            return self.dry_run_result["tableMetadata"]["schema"]["fields"]
291
292        return []

Return the schema of the provided table.

def get_table_metadata(self):
294    def get_table_metadata(self):
295        """Return table metadata."""
296        self.validate()
297
298        if (
299            self.dry_run_result
300            and self.dry_run_result["valid"]
301            and "tableMetadata" in self.dry_run_result
302        ):
303            return self.dry_run_result["tableMetadata"]
304
305        return {}

Return table metadata.

def validate(self):
307    def validate(self):
308        """Dry run the provided SQL file and check if valid."""
309        dry_run_error = DryRunError(
310            "Error when dry running SQL",
311            self.get_error(),
312            self.use_cloud_function,
313            self.table,
314        )
315
316        if self.dry_run_result is None:
317            raise dry_run_error
318
319        if self.dry_run_result["valid"]:
320            return True
321        elif self.get_error() == Errors.READ_ONLY:
322            # We want the dryrun service to only have read permissions, so
323            # we expect CREATE VIEW and CREATE TABLE to throw specific
324            # exceptions.
325            return True
326        elif self.get_error() == Errors.DATE_FILTER_NEEDED:
327            # With strip_dml flag, some queries require a partition filter
328            # (submission_date, submission_timestamp, etc.) to run
329            return True
330        else:
331            print("ERROR\n", self.dry_run_result["errors"])
332            raise dry_run_error

Dry run the provided SQL file and check if valid.

def errors(self):
334    def errors(self):
335        """Dry run the provided SQL file and return errors."""
336        if self.dry_run_result is None:
337            return []
338        return self.dry_run_result.get("errors", [])

Dry run the provided SQL file and return errors.

def get_error(self) -> Optional[Errors]:
340    def get_error(self) -> Optional[Errors]:
341        """Get specific errors for edge case handling."""
342        errors = self.errors()
343        if len(errors) != 1:
344            return None
345
346        error = errors[0]
347        if error and error.get("code") in [400, 403]:
348            error_message = error.get("message", "")
349            if (
350                "does not have bigquery.tables.create permission for dataset"
351                in error_message
352                or "Permission bigquery.tables.create denied" in error_message
353                or "Permission bigquery.datasets.update denied" in error_message
354            ):
355                return Errors.READ_ONLY
356            if "without a filter over column(s)" in error_message:
357                return Errors.DATE_FILTER_NEEDED
358            if (
359                "Syntax error: Expected end of input but got keyword WHERE"
360                in error_message
361            ):
362                return Errors.DATE_FILTER_NEEDED_AND_SYNTAX
363            if (
364                "Permission bigquery.tables.get denied on table" in error_message
365                or "User does not have permission to query table" in error_message
366            ):
367                return Errors.PERMISSION_DENIED
368        return None

Get specific errors for edge case handling.