generator.dryrun

Dry Run method to get BigQuery metadata.

  1"""Dry Run method to get BigQuery metadata."""
  2
  3import json
  4from enum import Enum
  5from functools import cached_property
  6from typing import Optional
  7from urllib.request import Request, urlopen
  8
  9import google.auth
 10from google.auth.transport.requests import Request as GoogleAuthRequest
 11from google.cloud import bigquery
 12from google.oauth2.id_token import fetch_id_token
 13
 14DRY_RUN_URL = (
 15    "https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun"
 16)
 17
 18
 19def credentials(auth_req: Optional[GoogleAuthRequest] = None):
 20    """Get GCP credentials."""
 21    auth_req = auth_req or GoogleAuthRequest()
 22    creds, _ = google.auth.default(
 23        scopes=["https://www.googleapis.com/auth/cloud-platform"]
 24    )
 25    creds.refresh(auth_req)
 26    return creds
 27
 28
 29def id_token():
 30    """Get token to authenticate against Cloud Function."""
 31    auth_req = GoogleAuthRequest()
 32    creds = credentials(auth_req)
 33
 34    if hasattr(creds, "id_token"):
 35        # Get token from default credentials for the current environment created via Cloud SDK run
 36        id_token = creds.id_token
 37    else:
 38        # If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file,
 39        # then ID token is acquired using this service account credentials.
 40        id_token = fetch_id_token(auth_req, DRY_RUN_URL)
 41    return id_token
 42
 43
 44class DryRunError(Exception):
 45    """Exception raised on dry run errors."""
 46
 47    def __init__(self, message, error, use_cloud_function, table_id):
 48        """Initialize DryRunError."""
 49        super().__init__(message)
 50        self.error = error
 51        self.use_cloud_function = use_cloud_function
 52        self.table_id = table_id
 53
 54    def __reduce__(self):
 55        """
 56        Override to ensure that all parameters are being passed when pickling.
 57
 58        Pickling happens when passing exception between processes (e.g. via multiprocessing)
 59        """
 60        return (
 61            self.__class__,
 62            self.args + (self.error, self.use_cloud_function, self.table_id),
 63        )
 64
 65
 66class Errors(Enum):
 67    """DryRun errors that require special handling."""
 68
 69    READ_ONLY = 1
 70    DATE_FILTER_NEEDED = 2
 71    DATE_FILTER_NEEDED_AND_SYNTAX = 3
 72    PERMISSION_DENIED = 4
 73
 74
 75class DryRunContext:
 76    """DryRun builder class."""
 77
 78    def __init__(
 79        self,
 80        use_cloud_function=False,
 81        id_token=None,
 82        credentials=None,
 83        dry_run_url=DRY_RUN_URL,
 84    ):
 85        """Initialize dry run instance."""
 86        self.use_cloud_function = use_cloud_function
 87        self.dry_run_url = dry_run_url
 88        self.id_token = id_token
 89        self.credentials = credentials
 90
 91    def create(
 92        self,
 93        sql=None,
 94        project="moz-fx-data-shared-prod",
 95        dataset=None,
 96        table=None,
 97    ):
 98        """Initialize a DryRun instance."""
 99        return DryRun(
100            use_cloud_function=self.use_cloud_function,
101            id_token=self.id_token,
102            credentials=self.credentials,
103            sql=sql,
104            project=project,
105            dataset=dataset,
106            table=table,
107            dry_run_url=self.dry_run_url,
108        )
109
110
111class DryRun:
112    """Dry run SQL."""
113
114    def __init__(
115        self,
116        use_cloud_function=False,
117        id_token=None,
118        credentials=None,
119        sql=None,
120        project="moz-fx-data-shared-prod",
121        dataset=None,
122        table=None,
123        dry_run_url=DRY_RUN_URL,
124    ):
125        """Initialize dry run instance."""
126        self.sql = sql
127        self.use_cloud_function = use_cloud_function
128        self.project = project
129        self.dataset = dataset
130        self.table = table
131        self.dry_run_url = dry_run_url
132        self.id_token = id_token
133        self.credentials = credentials
134
135    @cached_property
136    def client(self):
137        """Get BigQuery client instance."""
138        return bigquery.Client(credentials=self.credentials)
139
140    @cached_property
141    def dry_run_result(self):
142        """Return the dry run result."""
143        try:
144            if self.use_cloud_function:
145                json_data = {
146                    "query": self.sql or "SELECT 1",
147                    "project": self.project,
148                    "dataset": self.dataset or "telemetry",
149                }
150
151                if self.table:
152                    json_data["table"] = self.table
153
154                r = urlopen(
155                    Request(
156                        self.dry_run_url,
157                        headers={
158                            "Content-Type": "application/json",
159                            "Authorization": f"Bearer {self.id_token}",
160                        },
161                        data=json.dumps(json_data).encode("utf8"),
162                        method="POST",
163                    )
164                )
165                return json.load(r)
166            else:
167                query_schema = None
168                referenced_tables = []
169                table_metadata = None
170
171                if self.sql:
172                    job_config = bigquery.QueryJobConfig(
173                        dry_run=True,
174                        use_query_cache=False,
175                        query_parameters=[
176                            bigquery.ScalarQueryParameter(
177                                "submission_date", "DATE", "2019-01-01"
178                            )
179                        ],
180                    )
181
182                    if self.project:
183                        job_config.connection_properties = [
184                            bigquery.ConnectionProperty(
185                                "dataset_project_id", self.project
186                            )
187                        ]
188
189                    job = self.client.query(self.sql, job_config=job_config)
190                    query_schema = (
191                        job._properties.get("statistics", {})
192                        .get("query", {})
193                        .get("schema", {})
194                    )
195                    referenced_tables = [
196                        ref.to_api_repr() for ref in job.referenced_tables
197                    ]
198
199                if (
200                    self.project is not None
201                    and self.table is not None
202                    and self.dataset is not None
203                ):
204                    table = self.client.get_table(
205                        f"{self.project}.{self.dataset}.{self.table}"
206                    )
207                    table_metadata = {
208                        "tableType": table.table_type,
209                        "friendlyName": table.friendly_name,
210                        "schema": {
211                            "fields": [field.to_api_repr() for field in table.schema]
212                        },
213                    }
214
215                return {
216                    "valid": True,
217                    "referencedTables": referenced_tables,
218                    "schema": query_schema,
219                    "tableMetadata": table_metadata,
220                }
221        except Exception as e:
222            print(f"ERROR {e}")
223            return None
224
225    def get_schema(self):
226        """Return the query schema by dry running the SQL file."""
227        self.validate()
228
229        if (
230            self.dry_run_result
231            and self.dry_run_result["valid"]
232            and "schema" in self.dry_run_result
233        ):
234            return self.dry_run_result["schema"]["fields"]
235
236        return []
237
238    def get_table_schema(self):
239        """Return the schema of the provided table."""
240        self.validate()
241
242        if (
243            self.dry_run_result
244            and self.dry_run_result["valid"]
245            and "tableMetadata" in self.dry_run_result
246        ):
247            return self.dry_run_result["tableMetadata"]["schema"]["fields"]
248
249        return []
250
251    def get_table_metadata(self):
252        """Return table metadata."""
253        self.validate()
254
255        if (
256            self.dry_run_result
257            and self.dry_run_result["valid"]
258            and "tableMetadata" in self.dry_run_result
259        ):
260            return self.dry_run_result["tableMetadata"]
261
262        return {}
263
264    def validate(self):
265        """Dry run the provided SQL file and check if valid."""
266        dry_run_error = DryRunError(
267            "Error when dry running SQL",
268            self.get_error(),
269            self.use_cloud_function,
270            self.table,
271        )
272
273        if self.dry_run_result is None:
274            raise dry_run_error
275
276        if self.dry_run_result["valid"]:
277            return True
278        elif self.get_error() == Errors.READ_ONLY:
279            # We want the dryrun service to only have read permissions, so
280            # we expect CREATE VIEW and CREATE TABLE to throw specific
281            # exceptions.
282            return True
283        elif self.get_error() == Errors.DATE_FILTER_NEEDED:
284            # With strip_dml flag, some queries require a partition filter
285            # (submission_date, submission_timestamp, etc.) to run
286            return True
287        else:
288            print("ERROR\n", self.dry_run_result["errors"])
289            raise dry_run_error
290
291    def errors(self):
292        """Dry run the provided SQL file and return errors."""
293        if self.dry_run_result is None:
294            return []
295        return self.dry_run_result.get("errors", [])
296
297    def get_error(self) -> Optional[Errors]:
298        """Get specific errors for edge case handling."""
299        errors = self.errors()
300        if len(errors) != 1:
301            return None
302
303        error = errors[0]
304        if error and error.get("code") in [400, 403]:
305            error_message = error.get("message", "")
306            if (
307                "does not have bigquery.tables.create permission for dataset"
308                in error_message
309                or "Permission bigquery.tables.create denied" in error_message
310                or "Permission bigquery.datasets.update denied" in error_message
311            ):
312                return Errors.READ_ONLY
313            if "without a filter over column(s)" in error_message:
314                return Errors.DATE_FILTER_NEEDED
315            if (
316                "Syntax error: Expected end of input but got keyword WHERE"
317                in error_message
318            ):
319                return Errors.DATE_FILTER_NEEDED_AND_SYNTAX
320            if (
321                "Permission bigquery.tables.get denied on table" in error_message
322                or "User does not have permission to query table" in error_message
323            ):
324                return Errors.PERMISSION_DENIED
325        return None
DRY_RUN_URL = 'https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun'
def credentials(auth_req: Optional[google.auth.transport.requests.Request] = None):
20def credentials(auth_req: Optional[GoogleAuthRequest] = None):
21    """Get GCP credentials."""
22    auth_req = auth_req or GoogleAuthRequest()
23    creds, _ = google.auth.default(
24        scopes=["https://www.googleapis.com/auth/cloud-platform"]
25    )
26    creds.refresh(auth_req)
27    return creds

Get GCP credentials.

def id_token():
30def id_token():
31    """Get token to authenticate against Cloud Function."""
32    auth_req = GoogleAuthRequest()
33    creds = credentials(auth_req)
34
35    if hasattr(creds, "id_token"):
36        # Get token from default credentials for the current environment created via Cloud SDK run
37        id_token = creds.id_token
38    else:
39        # If the environment variable GOOGLE_APPLICATION_CREDENTIALS is set to service account JSON file,
40        # then ID token is acquired using this service account credentials.
41        id_token = fetch_id_token(auth_req, DRY_RUN_URL)
42    return id_token

Get token to authenticate against Cloud Function.

class DryRunError(builtins.Exception):
45class DryRunError(Exception):
46    """Exception raised on dry run errors."""
47
48    def __init__(self, message, error, use_cloud_function, table_id):
49        """Initialize DryRunError."""
50        super().__init__(message)
51        self.error = error
52        self.use_cloud_function = use_cloud_function
53        self.table_id = table_id
54
55    def __reduce__(self):
56        """
57        Override to ensure that all parameters are being passed when pickling.
58
59        Pickling happens when passing exception between processes (e.g. via multiprocessing)
60        """
61        return (
62            self.__class__,
63            self.args + (self.error, self.use_cloud_function, self.table_id),
64        )

Exception raised on dry run errors.

DryRunError(message, error, use_cloud_function, table_id)
48    def __init__(self, message, error, use_cloud_function, table_id):
49        """Initialize DryRunError."""
50        super().__init__(message)
51        self.error = error
52        self.use_cloud_function = use_cloud_function
53        self.table_id = table_id

Initialize DryRunError.

error
use_cloud_function
table_id
class Errors(enum.Enum):
67class Errors(Enum):
68    """DryRun errors that require special handling."""
69
70    READ_ONLY = 1
71    DATE_FILTER_NEEDED = 2
72    DATE_FILTER_NEEDED_AND_SYNTAX = 3
73    PERMISSION_DENIED = 4

DryRun errors that require special handling.

READ_ONLY = <Errors.READ_ONLY: 1>
DATE_FILTER_NEEDED = <Errors.DATE_FILTER_NEEDED: 2>
DATE_FILTER_NEEDED_AND_SYNTAX = <Errors.DATE_FILTER_NEEDED_AND_SYNTAX: 3>
PERMISSION_DENIED = <Errors.PERMISSION_DENIED: 4>
class DryRunContext:
 76class DryRunContext:
 77    """DryRun builder class."""
 78
 79    def __init__(
 80        self,
 81        use_cloud_function=False,
 82        id_token=None,
 83        credentials=None,
 84        dry_run_url=DRY_RUN_URL,
 85    ):
 86        """Initialize dry run instance."""
 87        self.use_cloud_function = use_cloud_function
 88        self.dry_run_url = dry_run_url
 89        self.id_token = id_token
 90        self.credentials = credentials
 91
 92    def create(
 93        self,
 94        sql=None,
 95        project="moz-fx-data-shared-prod",
 96        dataset=None,
 97        table=None,
 98    ):
 99        """Initialize a DryRun instance."""
100        return DryRun(
101            use_cloud_function=self.use_cloud_function,
102            id_token=self.id_token,
103            credentials=self.credentials,
104            sql=sql,
105            project=project,
106            dataset=dataset,
107            table=table,
108            dry_run_url=self.dry_run_url,
109        )

DryRun builder class.

DryRunContext( use_cloud_function=False, id_token=None, credentials=None, dry_run_url='https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun')
79    def __init__(
80        self,
81        use_cloud_function=False,
82        id_token=None,
83        credentials=None,
84        dry_run_url=DRY_RUN_URL,
85    ):
86        """Initialize dry run instance."""
87        self.use_cloud_function = use_cloud_function
88        self.dry_run_url = dry_run_url
89        self.id_token = id_token
90        self.credentials = credentials

Initialize dry run instance.

use_cloud_function
dry_run_url
id_token
credentials
def create( self, sql=None, project='moz-fx-data-shared-prod', dataset=None, table=None):
 92    def create(
 93        self,
 94        sql=None,
 95        project="moz-fx-data-shared-prod",
 96        dataset=None,
 97        table=None,
 98    ):
 99        """Initialize a DryRun instance."""
100        return DryRun(
101            use_cloud_function=self.use_cloud_function,
102            id_token=self.id_token,
103            credentials=self.credentials,
104            sql=sql,
105            project=project,
106            dataset=dataset,
107            table=table,
108            dry_run_url=self.dry_run_url,
109        )

Initialize a DryRun instance.

class DryRun:
112class DryRun:
113    """Dry run SQL."""
114
115    def __init__(
116        self,
117        use_cloud_function=False,
118        id_token=None,
119        credentials=None,
120        sql=None,
121        project="moz-fx-data-shared-prod",
122        dataset=None,
123        table=None,
124        dry_run_url=DRY_RUN_URL,
125    ):
126        """Initialize dry run instance."""
127        self.sql = sql
128        self.use_cloud_function = use_cloud_function
129        self.project = project
130        self.dataset = dataset
131        self.table = table
132        self.dry_run_url = dry_run_url
133        self.id_token = id_token
134        self.credentials = credentials
135
136    @cached_property
137    def client(self):
138        """Get BigQuery client instance."""
139        return bigquery.Client(credentials=self.credentials)
140
141    @cached_property
142    def dry_run_result(self):
143        """Return the dry run result."""
144        try:
145            if self.use_cloud_function:
146                json_data = {
147                    "query": self.sql or "SELECT 1",
148                    "project": self.project,
149                    "dataset": self.dataset or "telemetry",
150                }
151
152                if self.table:
153                    json_data["table"] = self.table
154
155                r = urlopen(
156                    Request(
157                        self.dry_run_url,
158                        headers={
159                            "Content-Type": "application/json",
160                            "Authorization": f"Bearer {self.id_token}",
161                        },
162                        data=json.dumps(json_data).encode("utf8"),
163                        method="POST",
164                    )
165                )
166                return json.load(r)
167            else:
168                query_schema = None
169                referenced_tables = []
170                table_metadata = None
171
172                if self.sql:
173                    job_config = bigquery.QueryJobConfig(
174                        dry_run=True,
175                        use_query_cache=False,
176                        query_parameters=[
177                            bigquery.ScalarQueryParameter(
178                                "submission_date", "DATE", "2019-01-01"
179                            )
180                        ],
181                    )
182
183                    if self.project:
184                        job_config.connection_properties = [
185                            bigquery.ConnectionProperty(
186                                "dataset_project_id", self.project
187                            )
188                        ]
189
190                    job = self.client.query(self.sql, job_config=job_config)
191                    query_schema = (
192                        job._properties.get("statistics", {})
193                        .get("query", {})
194                        .get("schema", {})
195                    )
196                    referenced_tables = [
197                        ref.to_api_repr() for ref in job.referenced_tables
198                    ]
199
200                if (
201                    self.project is not None
202                    and self.table is not None
203                    and self.dataset is not None
204                ):
205                    table = self.client.get_table(
206                        f"{self.project}.{self.dataset}.{self.table}"
207                    )
208                    table_metadata = {
209                        "tableType": table.table_type,
210                        "friendlyName": table.friendly_name,
211                        "schema": {
212                            "fields": [field.to_api_repr() for field in table.schema]
213                        },
214                    }
215
216                return {
217                    "valid": True,
218                    "referencedTables": referenced_tables,
219                    "schema": query_schema,
220                    "tableMetadata": table_metadata,
221                }
222        except Exception as e:
223            print(f"ERROR {e}")
224            return None
225
226    def get_schema(self):
227        """Return the query schema by dry running the SQL file."""
228        self.validate()
229
230        if (
231            self.dry_run_result
232            and self.dry_run_result["valid"]
233            and "schema" in self.dry_run_result
234        ):
235            return self.dry_run_result["schema"]["fields"]
236
237        return []
238
239    def get_table_schema(self):
240        """Return the schema of the provided table."""
241        self.validate()
242
243        if (
244            self.dry_run_result
245            and self.dry_run_result["valid"]
246            and "tableMetadata" in self.dry_run_result
247        ):
248            return self.dry_run_result["tableMetadata"]["schema"]["fields"]
249
250        return []
251
252    def get_table_metadata(self):
253        """Return table metadata."""
254        self.validate()
255
256        if (
257            self.dry_run_result
258            and self.dry_run_result["valid"]
259            and "tableMetadata" in self.dry_run_result
260        ):
261            return self.dry_run_result["tableMetadata"]
262
263        return {}
264
265    def validate(self):
266        """Dry run the provided SQL file and check if valid."""
267        dry_run_error = DryRunError(
268            "Error when dry running SQL",
269            self.get_error(),
270            self.use_cloud_function,
271            self.table,
272        )
273
274        if self.dry_run_result is None:
275            raise dry_run_error
276
277        if self.dry_run_result["valid"]:
278            return True
279        elif self.get_error() == Errors.READ_ONLY:
280            # We want the dryrun service to only have read permissions, so
281            # we expect CREATE VIEW and CREATE TABLE to throw specific
282            # exceptions.
283            return True
284        elif self.get_error() == Errors.DATE_FILTER_NEEDED:
285            # With strip_dml flag, some queries require a partition filter
286            # (submission_date, submission_timestamp, etc.) to run
287            return True
288        else:
289            print("ERROR\n", self.dry_run_result["errors"])
290            raise dry_run_error
291
292    def errors(self):
293        """Dry run the provided SQL file and return errors."""
294        if self.dry_run_result is None:
295            return []
296        return self.dry_run_result.get("errors", [])
297
298    def get_error(self) -> Optional[Errors]:
299        """Get specific errors for edge case handling."""
300        errors = self.errors()
301        if len(errors) != 1:
302            return None
303
304        error = errors[0]
305        if error and error.get("code") in [400, 403]:
306            error_message = error.get("message", "")
307            if (
308                "does not have bigquery.tables.create permission for dataset"
309                in error_message
310                or "Permission bigquery.tables.create denied" in error_message
311                or "Permission bigquery.datasets.update denied" in error_message
312            ):
313                return Errors.READ_ONLY
314            if "without a filter over column(s)" in error_message:
315                return Errors.DATE_FILTER_NEEDED
316            if (
317                "Syntax error: Expected end of input but got keyword WHERE"
318                in error_message
319            ):
320                return Errors.DATE_FILTER_NEEDED_AND_SYNTAX
321            if (
322                "Permission bigquery.tables.get denied on table" in error_message
323                or "User does not have permission to query table" in error_message
324            ):
325                return Errors.PERMISSION_DENIED
326        return None

Dry run SQL.

DryRun( use_cloud_function=False, id_token=None, credentials=None, sql=None, project='moz-fx-data-shared-prod', dataset=None, table=None, dry_run_url='https://us-central1-moz-fx-data-shared-prod.cloudfunctions.net/bigquery-etl-dryrun')
115    def __init__(
116        self,
117        use_cloud_function=False,
118        id_token=None,
119        credentials=None,
120        sql=None,
121        project="moz-fx-data-shared-prod",
122        dataset=None,
123        table=None,
124        dry_run_url=DRY_RUN_URL,
125    ):
126        """Initialize dry run instance."""
127        self.sql = sql
128        self.use_cloud_function = use_cloud_function
129        self.project = project
130        self.dataset = dataset
131        self.table = table
132        self.dry_run_url = dry_run_url
133        self.id_token = id_token
134        self.credentials = credentials

Initialize dry run instance.

sql
use_cloud_function
project
dataset
table
dry_run_url
id_token
credentials
client
136    @cached_property
137    def client(self):
138        """Get BigQuery client instance."""
139        return bigquery.Client(credentials=self.credentials)

Get BigQuery client instance.

dry_run_result
141    @cached_property
142    def dry_run_result(self):
143        """Return the dry run result."""
144        try:
145            if self.use_cloud_function:
146                json_data = {
147                    "query": self.sql or "SELECT 1",
148                    "project": self.project,
149                    "dataset": self.dataset or "telemetry",
150                }
151
152                if self.table:
153                    json_data["table"] = self.table
154
155                r = urlopen(
156                    Request(
157                        self.dry_run_url,
158                        headers={
159                            "Content-Type": "application/json",
160                            "Authorization": f"Bearer {self.id_token}",
161                        },
162                        data=json.dumps(json_data).encode("utf8"),
163                        method="POST",
164                    )
165                )
166                return json.load(r)
167            else:
168                query_schema = None
169                referenced_tables = []
170                table_metadata = None
171
172                if self.sql:
173                    job_config = bigquery.QueryJobConfig(
174                        dry_run=True,
175                        use_query_cache=False,
176                        query_parameters=[
177                            bigquery.ScalarQueryParameter(
178                                "submission_date", "DATE", "2019-01-01"
179                            )
180                        ],
181                    )
182
183                    if self.project:
184                        job_config.connection_properties = [
185                            bigquery.ConnectionProperty(
186                                "dataset_project_id", self.project
187                            )
188                        ]
189
190                    job = self.client.query(self.sql, job_config=job_config)
191                    query_schema = (
192                        job._properties.get("statistics", {})
193                        .get("query", {})
194                        .get("schema", {})
195                    )
196                    referenced_tables = [
197                        ref.to_api_repr() for ref in job.referenced_tables
198                    ]
199
200                if (
201                    self.project is not None
202                    and self.table is not None
203                    and self.dataset is not None
204                ):
205                    table = self.client.get_table(
206                        f"{self.project}.{self.dataset}.{self.table}"
207                    )
208                    table_metadata = {
209                        "tableType": table.table_type,
210                        "friendlyName": table.friendly_name,
211                        "schema": {
212                            "fields": [field.to_api_repr() for field in table.schema]
213                        },
214                    }
215
216                return {
217                    "valid": True,
218                    "referencedTables": referenced_tables,
219                    "schema": query_schema,
220                    "tableMetadata": table_metadata,
221                }
222        except Exception as e:
223            print(f"ERROR {e}")
224            return None

Return the dry run result.

def get_schema(self):
226    def get_schema(self):
227        """Return the query schema by dry running the SQL file."""
228        self.validate()
229
230        if (
231            self.dry_run_result
232            and self.dry_run_result["valid"]
233            and "schema" in self.dry_run_result
234        ):
235            return self.dry_run_result["schema"]["fields"]
236
237        return []

Return the query schema by dry running the SQL file.

def get_table_schema(self):
239    def get_table_schema(self):
240        """Return the schema of the provided table."""
241        self.validate()
242
243        if (
244            self.dry_run_result
245            and self.dry_run_result["valid"]
246            and "tableMetadata" in self.dry_run_result
247        ):
248            return self.dry_run_result["tableMetadata"]["schema"]["fields"]
249
250        return []

Return the schema of the provided table.

def get_table_metadata(self):
252    def get_table_metadata(self):
253        """Return table metadata."""
254        self.validate()
255
256        if (
257            self.dry_run_result
258            and self.dry_run_result["valid"]
259            and "tableMetadata" in self.dry_run_result
260        ):
261            return self.dry_run_result["tableMetadata"]
262
263        return {}

Return table metadata.

def validate(self):
265    def validate(self):
266        """Dry run the provided SQL file and check if valid."""
267        dry_run_error = DryRunError(
268            "Error when dry running SQL",
269            self.get_error(),
270            self.use_cloud_function,
271            self.table,
272        )
273
274        if self.dry_run_result is None:
275            raise dry_run_error
276
277        if self.dry_run_result["valid"]:
278            return True
279        elif self.get_error() == Errors.READ_ONLY:
280            # We want the dryrun service to only have read permissions, so
281            # we expect CREATE VIEW and CREATE TABLE to throw specific
282            # exceptions.
283            return True
284        elif self.get_error() == Errors.DATE_FILTER_NEEDED:
285            # With strip_dml flag, some queries require a partition filter
286            # (submission_date, submission_timestamp, etc.) to run
287            return True
288        else:
289            print("ERROR\n", self.dry_run_result["errors"])
290            raise dry_run_error

Dry run the provided SQL file and check if valid.

def errors(self):
292    def errors(self):
293        """Dry run the provided SQL file and return errors."""
294        if self.dry_run_result is None:
295            return []
296        return self.dry_run_result.get("errors", [])

Dry run the provided SQL file and return errors.

def get_error(self) -> Optional[Errors]:
298    def get_error(self) -> Optional[Errors]:
299        """Get specific errors for edge case handling."""
300        errors = self.errors()
301        if len(errors) != 1:
302            return None
303
304        error = errors[0]
305        if error and error.get("code") in [400, 403]:
306            error_message = error.get("message", "")
307            if (
308                "does not have bigquery.tables.create permission for dataset"
309                in error_message
310                or "Permission bigquery.tables.create denied" in error_message
311                or "Permission bigquery.datasets.update denied" in error_message
312            ):
313                return Errors.READ_ONLY
314            if "without a filter over column(s)" in error_message:
315                return Errors.DATE_FILTER_NEEDED
316            if (
317                "Syntax error: Expected end of input but got keyword WHERE"
318                in error_message
319            ):
320                return Errors.DATE_FILTER_NEEDED_AND_SYNTAX
321            if (
322                "Permission bigquery.tables.get denied on table" in error_message
323                or "User does not have permission to query table" in error_message
324            ):
325                return Errors.PERMISSION_DENIED
326        return None

Get specific errors for edge case handling.