Module pipelines.utils.dump_db.db

Database definitions for SQL pipelines.

Classes

class Database (hostname: str, port: int, user: str, password: str, database: str)

Database abstract class.

Initializes the database.

Args

hostname
The hostname of the database.
port
The port of the database.
user
The username of the database.
password
The password of the database.
database
The database name.
Expand source code
class Database(ABC):
    """
    Database abstract class.
    """

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        hostname: str,
        port: int,
        user: str,
        password: str,
        database: str,
    ) -> None:
        """
        Initializes the database.

        Args:
            hostname: The hostname of the database.
            port: The port of the database.
            user: The username of the database.
            password: The password of the database.
            database: The database name.
        """
        self._hostname = hostname
        self._port = port
        self._user = user
        self._password = password
        self._database = database
        self._connection = self.connect()
        self._cursor = self.get_cursor()

    @abstractmethod
    def connect(self):
        """
        Connect to the database.
        """

    @abstractmethod
    def get_cursor(self):
        """
        Returns a cursor for the database.
        """

    @abstractmethod
    def execute_query(self, query: str) -> None:
        """
        Execute query on the database.

        Args:
            query: The query to execute.
        """

    @abstractmethod
    def get_columns(self) -> List[str]:
        """
        Returns the column names of the database.
        """

    @abstractmethod
    def fetch_batch(self, batch_size: int) -> List[List]:
        """
        Fetches a batch of rows from the database.
        """

    @abstractmethod
    def fetch_all(self) -> List[List]:
        """
        Fetches all rows from the database.
        """

Ancestors

  • abc.ABC

Subclasses

Methods

def connect(self)

Connect to the database.

def execute_query(self, query: str) ‑> None

Execute query on the database.

Args

query
The query to execute.
def fetch_all(self) ‑> List[List]

Fetches all rows from the database.

def fetch_batch(self, batch_size: int) ‑> List[List]

Fetches a batch of rows from the database.

def get_columns(self) ‑> List[str]

Returns the column names of the database.

def get_cursor(self)

Returns a cursor for the database.

class MySql (hostname: str, user: str, password: str, database: str, port: int = 3306)

MySQL database.

Initializes the MySQL database.

Args

hostname
The hostname of the database.
port
The port of the database.
user
The username of the database.
password
The password of the database.
database
The database name.
Expand source code
class MySql(Database):
    """
    MySQL database.
    """

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        hostname: str,
        user: str,
        password: str,
        database: str,
        port: int = 3306,
    ) -> None:
        """
        Initializes the MySQL database.

        Args:
            hostname: The hostname of the database.
            port: The port of the database.
            user: The username of the database.
            password: The password of the database.
            database: The database name.
        """
        port = port if isinstance(port, int) else int(port)
        super().__init__(
            hostname,
            port,
            user,
            password,
            database,
        )

    def connect(self):
        """
        Connect to the MySQL.
        """
        # pylint: disable=E1101
        return pymysql.connect(
            host=self._hostname,
            port=self._port,
            user=self._user,
            password=self._password,
            database=self._database,
        )

    def get_cursor(self):
        """
        Returns a cursor for the MySQL.
        """
        return self._connection.cursor()

    def execute_query(self, query: str) -> None:
        """
        Execute query on the MySQL.

        Args:
            query: The query to execute.
        """
        self._cursor.execute(query)

    def get_columns(self) -> List[str]:
        """
        Returns the column names of the MySQL.
        """
        return [column[0] for column in self._cursor.description]

    def fetch_batch(self, batch_size: int) -> List[List]:
        """
        Fetches a batch of rows from the MySQL.
        """
        return [list(item) for item in self._cursor.fetchmany(batch_size)]

    def fetch_all(self) -> List[List]:
        """
        Fetches all rows from the MySQL.
        """
        return [list(item) for item in self._cursor.fetchall()]

Ancestors

Methods

def connect(self)

Connect to the MySQL.

def execute_query(self, query: str) ‑> None

Execute query on the MySQL.

Args

query
The query to execute.
def fetch_all(self) ‑> List[List]

Fetches all rows from the MySQL.

def fetch_batch(self, batch_size: int) ‑> List[List]

Fetches a batch of rows from the MySQL.

def get_columns(self) ‑> List[str]

Returns the column names of the MySQL.

def get_cursor(self)

Returns a cursor for the MySQL.

class Oracle (hostname: str, user: str, password: str, database: str, port: int = 1521)

Oracle Database

Initializes the Oracle database.

Args

hostname
The hostname of the database.
port
The port of the database.
user
The username of the database.
password
The password of the database.
database
The database name.
Expand source code
class Oracle(Database):
    """
    Oracle Database
    """

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        hostname: str,
        user: str,
        password: str,
        database: str,
        port: int = 1521,
    ) -> None:
        """
        Initializes the Oracle database.

        Args:
            hostname: The hostname of the database.
            port: The port of the database.
            user: The username of the database.
            password: The password of the database.
            database: The database name.
        """
        super().__init__(
            hostname,
            port,
            user,
            password,
            database,
        )

    def connect(self):
        """
        Connect to the Oracle.
        """
        # pylint: disable=E1101
        return cx_Oracle.connect(
            f"{self._user}/{self._password}@"
            f"{self._hostname}:{self._port}/{self._database}"
        )

    def get_cursor(self):
        """
        Returns a cursor for the Oracle.
        """
        return self._connection.cursor()

    def execute_query(self, query: str) -> None:
        """
        Execute query on the Oracle.

        Args:
            query: The query to execute.
        """
        self._cursor.execute(query)

    def get_columns(self) -> List[str]:
        """
        Returns the column names of the Oracle.
        """
        return [column[0] for column in self._cursor.description]

    def fetch_batch(self, batch_size: int) -> List[List]:
        """
        Fetches a batch of rows from the Oracle.
        """
        return [list(item) for item in self._cursor.fetchmany(batch_size)]

    def fetch_all(self) -> List[List]:
        """
        Fetches all rows from the Oracle.
        """
        return [list(item) for item in self._cursor.fetchall()]

Ancestors

Methods

def connect(self)

Connect to the Oracle.

def execute_query(self, query: str) ‑> None

Execute query on the Oracle.

Args

query
The query to execute.
def fetch_all(self) ‑> List[List]

Fetches all rows from the Oracle.

def fetch_batch(self, batch_size: int) ‑> List[List]

Fetches a batch of rows from the Oracle.

def get_columns(self) ‑> List[str]

Returns the column names of the Oracle.

def get_cursor(self)

Returns a cursor for the Oracle.

class SqlServer (hostname: str, user: str, password: str, database: str, port: int = 1433)

SQL Server database.

Initializes the SQL Server database.

Args

hostname
The hostname of the SQL Server.
port
The port of the SQL Server.
user
The username of the SQL Server.
password
The password of the SQL Server.
database
The database name.
Expand source code
class SqlServer(Database):
    """
    SQL Server database.
    """

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        hostname: str,
        user: str,
        password: str,
        database: str,
        port: int = 1433,
    ) -> None:
        """
        Initializes the SQL Server database.

        Args:
            hostname: The hostname of the SQL Server.
            port: The port of the SQL Server.
            user: The username of the SQL Server.
            password: The password of the SQL Server.
            database: The database name.
        """
        super().__init__(
            hostname,
            port,
            user,
            password,
            database,
        )

    def connect(self):
        """
        Connect to the SQL Server.
        """
        conn_str = (
            f"DRIVER={{ODBC Driver 17 for SQL Server}};"
            f"SERVER={self._hostname},{self._port};"
            f"DATABASE={self._database};"
            f"UID={self._user};"
            f"PWD={self._password};"
            "Encrypt=no;"
            "TrustServerCertificate=yes;"
        )
        return pyodbc.connect(conn_str)

    def get_cursor(self):
        """
        Returns a cursor for the SQL Server.
        """
        return self._connection.cursor()

    def execute_query(self, query: str) -> None:
        """
        Execute query on the SQL Server.

        Args:
            query: The query to execute.
        """
        self._cursor.execute(query)

    def get_columns(self) -> List[str]:
        """
        Returns the column names of the SQL Server.
        """
        return [column[0] for column in self._cursor.description]

    def fetch_batch(self, batch_size: int) -> List[List]:
        """
        Fetches a batch of rows from the SQL Server.
        """
        return [list(item) for item in self._cursor.fetchmany(batch_size)]

    def fetch_all(self) -> List[List]:
        """
        Fetches all rows from the SQL Server.
        """
        return [list(item) for item in self._cursor.fetchall()]

Ancestors

Methods

def connect(self)

Connect to the SQL Server.

def execute_query(self, query: str) ‑> None

Execute query on the SQL Server.

Args

query
The query to execute.
def fetch_all(self) ‑> List[List]

Fetches all rows from the SQL Server.

def fetch_batch(self, batch_size: int) ‑> List[List]

Fetches a batch of rows from the SQL Server.

def get_columns(self) ‑> List[str]

Returns the column names of the SQL Server.

def get_cursor(self)

Returns a cursor for the SQL Server.