from sqlalchemy import create_engine from towhee.operator import PyOperator class SqlStorage(PyOperator): """ Using sqlalchemy to manage SQLite, PostgreSQL, MySQL, MariaDB, SQL Server and Oracle. Args: sql_url: the url of the sql database for cache, such as '+://:@:/', 'sqlite:///./sqlite.db' for 'sqlite', 'mysql+pymysql://root:123456@127.0.0.1:3306/mysql' for 'mysql', table_name: the table name for sql database. cols: list of cols. """ def __init__(self, sql_url: str, table_name:str, cols: str = '*', where: str = None, limit: int = 500): self._sql_url = sql_url self._sql = "SELECT {} FROM {}".format(cols, table_name) if where: self._sql = "{} WHERE {}".format(self._sql, where) if limit: self._sql = "{} LIMIT {}".format(self._sql, limit) def __call__(self): engine = create_engine(self._sql_url) with engine.connect() as connection: result = connection.execute(self._sql) yield from result