Module pipelines.utils.dump_db.db
Database definitions for SQL pipelines.
Classes
class Database (hostname: str, port: int, user: str, password: str, database: str)
-
Database abstract class.
Initializes the database.
Args
hostname
- The hostname of the database.
port
- The port of the database.
user
- The username of the database.
password
- The password of the database.
database
- The database name.
Expand source code
class Database(ABC): """ Database abstract class. """ # pylint: disable=too-many-arguments def __init__( self, hostname: str, port: int, user: str, password: str, database: str, ) -> None: """ Initializes the database. Args: hostname: The hostname of the database. port: The port of the database. user: The username of the database. password: The password of the database. database: The database name. """ self._hostname = hostname self._port = port self._user = user self._password = password self._database = database self._connection = self.connect() self._cursor = self.get_cursor() @abstractmethod def connect(self): """ Connect to the database. """ @abstractmethod def get_cursor(self): """ Returns a cursor for the database. """ @abstractmethod def execute_query(self, query: str) -> None: """ Execute query on the database. Args: query: The query to execute. """ @abstractmethod def get_columns(self) -> List[str]: """ Returns the column names of the database. """ @abstractmethod def fetch_batch(self, batch_size: int) -> List[List]: """ Fetches a batch of rows from the database. """ @abstractmethod def fetch_all(self) -> List[List]: """ Fetches all rows from the database. """
Ancestors
- abc.ABC
Subclasses
Methods
def connect(self)
-
Connect to the database.
def execute_query(self, query: str) ‑> None
-
Execute query on the database.
Args
query
- The query to execute.
def fetch_all(self) ‑> List[List]
-
Fetches all rows from the database.
def fetch_batch(self, batch_size: int) ‑> List[List]
-
Fetches a batch of rows from the database.
def get_columns(self) ‑> List[str]
-
Returns the column names of the database.
def get_cursor(self)
-
Returns a cursor for the database.
class MySql (hostname: str, user: str, password: str, database: str, port: int = 3306)
-
MySQL database.
Initializes the MySQL database.
Args
hostname
- The hostname of the database.
port
- The port of the database.
user
- The username of the database.
password
- The password of the database.
database
- The database name.
Expand source code
class MySql(Database): """ MySQL database. """ # pylint: disable=too-many-arguments def __init__( self, hostname: str, user: str, password: str, database: str, port: int = 3306, ) -> None: """ Initializes the MySQL database. Args: hostname: The hostname of the database. port: The port of the database. user: The username of the database. password: The password of the database. database: The database name. """ port = port if isinstance(port, int) else int(port) super().__init__( hostname, port, user, password, database, ) def connect(self): """ Connect to the MySQL. """ # pylint: disable=E1101 return pymysql.connect( host=self._hostname, port=self._port, user=self._user, password=self._password, database=self._database, ) def get_cursor(self): """ Returns a cursor for the MySQL. """ return self._connection.cursor() def execute_query(self, query: str) -> None: """ Execute query on the MySQL. Args: query: The query to execute. """ self._cursor.execute(query) def get_columns(self) -> List[str]: """ Returns the column names of the MySQL. """ return [column[0] for column in self._cursor.description] def fetch_batch(self, batch_size: int) -> List[List]: """ Fetches a batch of rows from the MySQL. """ return [list(item) for item in self._cursor.fetchmany(batch_size)] def fetch_all(self) -> List[List]: """ Fetches all rows from the MySQL. """ return [list(item) for item in self._cursor.fetchall()]
Ancestors
- Database
- abc.ABC
Methods
def connect(self)
-
Connect to the MySQL.
def execute_query(self, query: str) ‑> None
-
Execute query on the MySQL.
Args
query
- The query to execute.
def fetch_all(self) ‑> List[List]
-
Fetches all rows from the MySQL.
def fetch_batch(self, batch_size: int) ‑> List[List]
-
Fetches a batch of rows from the MySQL.
def get_columns(self) ‑> List[str]
-
Returns the column names of the MySQL.
def get_cursor(self)
-
Returns a cursor for the MySQL.
class Oracle (hostname: str, user: str, password: str, database: str, port: int = 1521)
-
Oracle Database
Initializes the Oracle database.
Args
hostname
- The hostname of the database.
port
- The port of the database.
user
- The username of the database.
password
- The password of the database.
database
- The database name.
Expand source code
class Oracle(Database): """ Oracle Database """ # pylint: disable=too-many-arguments def __init__( self, hostname: str, user: str, password: str, database: str, port: int = 1521, ) -> None: """ Initializes the Oracle database. Args: hostname: The hostname of the database. port: The port of the database. user: The username of the database. password: The password of the database. database: The database name. """ super().__init__( hostname, port, user, password, database, ) def connect(self): """ Connect to the Oracle. """ # pylint: disable=E1101 return cx_Oracle.connect( f"{self._user}/{self._password}@" f"{self._hostname}:{self._port}/{self._database}" ) def get_cursor(self): """ Returns a cursor for the Oracle. """ return self._connection.cursor() def execute_query(self, query: str) -> None: """ Execute query on the Oracle. Args: query: The query to execute. """ self._cursor.execute(query) def get_columns(self) -> List[str]: """ Returns the column names of the Oracle. """ return [column[0] for column in self._cursor.description] def fetch_batch(self, batch_size: int) -> List[List]: """ Fetches a batch of rows from the Oracle. """ return [list(item) for item in self._cursor.fetchmany(batch_size)] def fetch_all(self) -> List[List]: """ Fetches all rows from the Oracle. """ return [list(item) for item in self._cursor.fetchall()]
Ancestors
- Database
- abc.ABC
Methods
def connect(self)
-
Connect to the Oracle.
def execute_query(self, query: str) ‑> None
-
Execute query on the Oracle.
Args
query
- The query to execute.
def fetch_all(self) ‑> List[List]
-
Fetches all rows from the Oracle.
def fetch_batch(self, batch_size: int) ‑> List[List]
-
Fetches a batch of rows from the Oracle.
def get_columns(self) ‑> List[str]
-
Returns the column names of the Oracle.
def get_cursor(self)
-
Returns a cursor for the Oracle.
class SqlServer (hostname: str, user: str, password: str, database: str, port: int = 1433)
-
SQL Server database.
Initializes the SQL Server database.
Args
hostname
- The hostname of the SQL Server.
port
- The port of the SQL Server.
user
- The username of the SQL Server.
password
- The password of the SQL Server.
database
- The database name.
Expand source code
class SqlServer(Database): """ SQL Server database. """ # pylint: disable=too-many-arguments def __init__( self, hostname: str, user: str, password: str, database: str, port: int = 1433, ) -> None: """ Initializes the SQL Server database. Args: hostname: The hostname of the SQL Server. port: The port of the SQL Server. user: The username of the SQL Server. password: The password of the SQL Server. database: The database name. """ super().__init__( hostname, port, user, password, database, ) def connect(self): """ Connect to the SQL Server. """ conn_str = ( f"DRIVER={{ODBC Driver 17 for SQL Server}};" f"SERVER={self._hostname},{self._port};" f"DATABASE={self._database};" f"UID={self._user};" f"PWD={self._password};" "Encrypt=no;" "TrustServerCertificate=yes;" ) return pyodbc.connect(conn_str) def get_cursor(self): """ Returns a cursor for the SQL Server. """ return self._connection.cursor() def execute_query(self, query: str) -> None: """ Execute query on the SQL Server. Args: query: The query to execute. """ self._cursor.execute(query) def get_columns(self) -> List[str]: """ Returns the column names of the SQL Server. """ return [column[0] for column in self._cursor.description] def fetch_batch(self, batch_size: int) -> List[List]: """ Fetches a batch of rows from the SQL Server. """ return [list(item) for item in self._cursor.fetchmany(batch_size)] def fetch_all(self) -> List[List]: """ Fetches all rows from the SQL Server. """ return [list(item) for item in self._cursor.fetchall()]
Ancestors
- Database
- abc.ABC
Methods
def connect(self)
-
Connect to the SQL Server.
def execute_query(self, query: str) ‑> None
-
Execute query on the SQL Server.
Args
query
- The query to execute.
def fetch_all(self) ‑> List[List]
-
Fetches all rows from the SQL Server.
def fetch_batch(self, batch_size: int) ‑> List[List]
-
Fetches a batch of rows from the SQL Server.
def get_columns(self) ‑> List[str]
-
Returns the column names of the SQL Server.
def get_cursor(self)
-
Returns a cursor for the SQL Server.