Module glean.net

Network functionality for Glean.

Expand source code
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


"""
Network functionality for Glean.
"""

from .base_uploader import BaseUploader
from .http_client import HttpClientUploader
from .ping_uploader import PingUploader
from .ping_upload_worker import PingUploadWorker


__all__ = [
    "BaseUploader",
    "HttpClientUploader",
    "PingUploader",
    "PingUploadWorker",
]

Sub-modules

glean.net.base_uploader

A base class for ping uploaders.

glean.net.http_client

This module contains a ping uploader based on the Python stdlib's http.client module.

glean.net.ping_upload_worker
glean.net.ping_uploader

Classes

class BaseUploader (*args, **kwargs)

The logic for uploading pings. This leaves the actual upload implementation to the user-provided delegate.

Expand source code
class BaseUploader(PingUploader):
    """
    The logic for uploading pings. This leaves the actual upload implementation
    to the user-provided delegate.
    """

    def do_upload(
        self,
        path: str,
        data: bytes,
        headers: Dict[str, str],
        config: "Configuration",
    ) -> Union[
        UploadResult,
        UploadResult.UNRECOVERABLE_FAILURE,
        UploadResult.RECOVERABLE_FAILURE,
        UploadResult.HTTP_STATUS,
    ]:
        """
        This function triggers the actual upload.

        It logs the ping and calls the implementation-specific upload function.

        Args:
            url (str): The URL path to upload the data to.
            data (bytes): The serialized data to send.
            headers (dict of (str, str)): Dictionary of header entries.
            config (glean.Configuration): The Glean Configuration object.

        Returns:
            result (UploadResult): the status code of the upload response.
        """

        return self.upload(
            url=config.server_endpoint + path,
            data=data,
            headers=headers,
        )

Ancestors

Subclasses

Methods

def do_upload(self, path: str, data: bytes, headers: Dict[str, str], config: Configuration) ‑> Union[glean._uniffi.glean.UploadResult, glean._uniffi.glean.UploadResult.UNRECOVERABLE_FAILURE, glean._uniffi.glean.UploadResult.RECOVERABLE_FAILURE, glean._uniffi.glean.UploadResult.HTTP_STATUS]

This function triggers the actual upload.

It logs the ping and calls the implementation-specific upload function.

Args

url : str
The URL path to upload the data to.
data : bytes
The serialized data to send.
headers (dict of (str, str)): Dictionary of header entries.
config : Configuration
The Glean Configuration object.

Returns

result (UploadResult): the status code of the upload response.

Expand source code
def do_upload(
    self,
    path: str,
    data: bytes,
    headers: Dict[str, str],
    config: "Configuration",
) -> Union[
    UploadResult,
    UploadResult.UNRECOVERABLE_FAILURE,
    UploadResult.RECOVERABLE_FAILURE,
    UploadResult.HTTP_STATUS,
]:
    """
    This function triggers the actual upload.

    It logs the ping and calls the implementation-specific upload function.

    Args:
        url (str): The URL path to upload the data to.
        data (bytes): The serialized data to send.
        headers (dict of (str, str)): Dictionary of header entries.
        config (glean.Configuration): The Glean Configuration object.

    Returns:
        result (UploadResult): the status code of the upload response.
    """

    return self.upload(
        url=config.server_endpoint + path,
        data=data,
        headers=headers,
    )

Inherited members

class HttpClientUploader (*args, **kwargs)

The logic for uploading pings. This leaves the actual upload implementation to the user-provided delegate.

Expand source code
class HttpClientUploader(base_uploader.BaseUploader):
    # The timeout, in seconds, to use for all operations with the server.
    _DEFAULT_TIMEOUT = 10

    @classmethod
    def upload(
        cls, url: str, data: bytes, headers: Dict[str, str]
    ) -> Union[
        UploadResult,
        UploadResult.UNRECOVERABLE_FAILURE,
        UploadResult.RECOVERABLE_FAILURE,
        UploadResult.HTTP_STATUS,
    ]:
        """
        Synchronously upload a ping to a server.

        Args:
            url (str): The URL path to upload the data to.
            data (str): The serialized text data to send.
            headers (dict of (str, str)): HTTP headers to send.
        """
        parsed_url = urllib.parse.urlparse(url)
        if parsed_url.scheme == "http":
            conn = http.client.HTTPConnection(
                parsed_url.hostname or "",
                port=parsed_url.port or 80,
                timeout=cls._DEFAULT_TIMEOUT,
            )
        elif parsed_url.scheme == "https":
            conn = http.client.HTTPSConnection(
                parsed_url.hostname or "",
                port=parsed_url.port or 443,
                timeout=cls._DEFAULT_TIMEOUT,
            )
        else:
            # If we don't know the URL scheme, log an error and mark this as an unrecoverable
            # error, like if it were a malformed URL.
            log.error(f"Unknown URL scheme {parsed_url.scheme}")
            return UploadResult.UNRECOVERABLE_FAILURE(0)

        try:
            conn.request(
                "POST",
                parsed_url.path,
                body=data,
                headers=headers,
            )
            response = conn.getresponse()
        except http.client.InvalidURL as e:
            log.error(f"Could not upload telemetry due to malformed URL: '{url}' {e}")
            return UploadResult.UNRECOVERABLE_FAILURE(0)
        except http.client.HTTPException as e:
            log.debug(f"http.client.HTTPException while uploading ping: '{url}' {e}")
            return UploadResult.RECOVERABLE_FAILURE(0)
        except socket.gaierror as e:
            log.debug(f"socket.gaierror while uploading ping: '{url}' {e}")
            return UploadResult.RECOVERABLE_FAILURE(0)
        except OSError as e:
            log.debug(f"OSError while uploading ping: '{url}' {e}")
            return UploadResult.RECOVERABLE_FAILURE(0)
        except Exception as e:
            log.error(f"Unknown Exception while uploading ping: '{url}' {e}")
            return UploadResult.RECOVERABLE_FAILURE(0)

        status_code = response.status

        conn.close()

        return UploadResult.HTTP_STATUS(status_code)

Ancestors

Static methods

def upload(url: str, data: bytes, headers: Dict[str, str]) ‑> Union[glean._uniffi.glean.UploadResult, glean._uniffi.glean.UploadResult.UNRECOVERABLE_FAILURE, glean._uniffi.glean.UploadResult.RECOVERABLE_FAILURE, glean._uniffi.glean.UploadResult.HTTP_STATUS]

Synchronously upload a ping to a server.

Args

url : str
The URL path to upload the data to.
data : str
The serialized text data to send.

headers (dict of (str, str)): HTTP headers to send.

Expand source code
@classmethod
def upload(
    cls, url: str, data: bytes, headers: Dict[str, str]
) -> Union[
    UploadResult,
    UploadResult.UNRECOVERABLE_FAILURE,
    UploadResult.RECOVERABLE_FAILURE,
    UploadResult.HTTP_STATUS,
]:
    """
    Synchronously upload a ping to a server.

    Args:
        url (str): The URL path to upload the data to.
        data (str): The serialized text data to send.
        headers (dict of (str, str)): HTTP headers to send.
    """
    parsed_url = urllib.parse.urlparse(url)
    if parsed_url.scheme == "http":
        conn = http.client.HTTPConnection(
            parsed_url.hostname or "",
            port=parsed_url.port or 80,
            timeout=cls._DEFAULT_TIMEOUT,
        )
    elif parsed_url.scheme == "https":
        conn = http.client.HTTPSConnection(
            parsed_url.hostname or "",
            port=parsed_url.port or 443,
            timeout=cls._DEFAULT_TIMEOUT,
        )
    else:
        # If we don't know the URL scheme, log an error and mark this as an unrecoverable
        # error, like if it were a malformed URL.
        log.error(f"Unknown URL scheme {parsed_url.scheme}")
        return UploadResult.UNRECOVERABLE_FAILURE(0)

    try:
        conn.request(
            "POST",
            parsed_url.path,
            body=data,
            headers=headers,
        )
        response = conn.getresponse()
    except http.client.InvalidURL as e:
        log.error(f"Could not upload telemetry due to malformed URL: '{url}' {e}")
        return UploadResult.UNRECOVERABLE_FAILURE(0)
    except http.client.HTTPException as e:
        log.debug(f"http.client.HTTPException while uploading ping: '{url}' {e}")
        return UploadResult.RECOVERABLE_FAILURE(0)
    except socket.gaierror as e:
        log.debug(f"socket.gaierror while uploading ping: '{url}' {e}")
        return UploadResult.RECOVERABLE_FAILURE(0)
    except OSError as e:
        log.debug(f"OSError while uploading ping: '{url}' {e}")
        return UploadResult.RECOVERABLE_FAILURE(0)
    except Exception as e:
        log.error(f"Unknown Exception while uploading ping: '{url}' {e}")
        return UploadResult.RECOVERABLE_FAILURE(0)

    status_code = response.status

    conn.close()

    return UploadResult.HTTP_STATUS(status_code)

Inherited members

class PingUploadWorker
Expand source code
class PingUploadWorker:
    @classmethod
    def process(cls, testing_mode: bool = False):
        """
        Function to deserialize and process all serialized ping files.

        This function will ignore files that don't match the UUID regex and
        just delete them to prevent files from polluting the ping storage
        directory.
        """
        if testing_mode:
            cls._test_process_sync()
            return

        cls._process()

    @classmethod
    def _process(cls, testing_mode: bool = False):
        from .. import Glean

        return ProcessDispatcher.dispatch(
            _process,
            (
                Glean._data_dir,
                Glean._application_id,
                Glean._configuration,
            ),
        )

    @classmethod
    def _test_process_sync(cls) -> bool:
        """
        This is a test-only function to process the ping uploads in a separate
        process, but blocks until it is complete.

        Returns:
            uploaded (bool): The success of the upload task.
        """
        p = cls._process()
        p.wait()
        return p.returncode == 0

Static methods

def process(testing_mode: bool = False)

Function to deserialize and process all serialized ping files.

This function will ignore files that don't match the UUID regex and just delete them to prevent files from polluting the ping storage directory.

Expand source code
@classmethod
def process(cls, testing_mode: bool = False):
    """
    Function to deserialize and process all serialized ping files.

    This function will ignore files that don't match the UUID regex and
    just delete them to prevent files from polluting the ping storage
    directory.
    """
    if testing_mode:
        cls._test_process_sync()
        return

    cls._process()
class PingUploader (*args, **kwargs)

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing), for example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...
Expand source code
class PingUploader(Protocol):
    def upload(
        self, url: str, data: bytes, headers: Dict[str, str]
    ) -> Union[
        UploadResult,
        UploadResult.UNRECOVERABLE_FAILURE,
        UploadResult.RECOVERABLE_FAILURE,
        UploadResult.HTTP_STATUS,
    ]:
        """
        Upload a ping to a server.

        Args:
            url (str): The URL path to upload the data to.
            data (bytes): The serialized data to send.
            headers (dict of (str, str)): Dictionary of header entries.

        Returns:
            result (UploadResult): the status code of the upload response.
        """
        pass

Ancestors

  • typing.Protocol
  • typing.Generic

Subclasses

Methods

def upload(self, url: str, data: bytes, headers: Dict[str, str]) ‑> Union[glean._uniffi.glean.UploadResult, glean._uniffi.glean.UploadResult.UNRECOVERABLE_FAILURE, glean._uniffi.glean.UploadResult.RECOVERABLE_FAILURE, glean._uniffi.glean.UploadResult.HTTP_STATUS]

Upload a ping to a server.

Args

url : str
The URL path to upload the data to.
data : bytes
The serialized data to send.

headers (dict of (str, str)): Dictionary of header entries.

Returns

result (UploadResult): the status code of the upload response.

Expand source code
def upload(
    self, url: str, data: bytes, headers: Dict[str, str]
) -> Union[
    UploadResult,
    UploadResult.UNRECOVERABLE_FAILURE,
    UploadResult.RECOVERABLE_FAILURE,
    UploadResult.HTTP_STATUS,
]:
    """
    Upload a ping to a server.

    Args:
        url (str): The URL path to upload the data to.
        data (bytes): The serialized data to send.
        headers (dict of (str, str)): Dictionary of header entries.

    Returns:
        result (UploadResult): the status code of the upload response.
    """
    pass