Skip to content

docketanalyzer

Docket Management

Pacer

Utility for downloading PACER data.

Convenience wrapper around Free Law Project's juriscraper for downloading dockets and documents from PACER.

Parameters:

Name Type Description Default
pacer_username str

PACER account username. If not provided, will use saved config or PACER_USERNAME from environment.

None
pacer_password str

PACER account password. If not provided, will use saved config or PACER_PASSWORD from environment.

None

Attributes:

Name Type Description
pacer_username str

The PACER account username

pacer_password str

The PACER account password

cache dict

Internal cache for storing session and driver instances

Source code in docketanalyzer/pacer/pacer.py
class Pacer:
    """Utility for downloading PACER data.

    Convenience wrapper around
        [Free Law Project's juriscraper](https://github.com/freelawproject/juriscraper)
        for downloading dockets and documents from PACER.

    Args:
        pacer_username (str, optional): PACER account username.
            If not provided, will use saved config or PACER_USERNAME from environment.
        pacer_password (str, optional): PACER account password.
            If not provided, will use saved config or PACER_PASSWORD from environment.

    Attributes:
        pacer_username (str): The PACER account username
        pacer_password (str): The PACER account password
        cache (dict): Internal cache for storing session and driver instances
    """

    def __init__(
        self, pacer_username: str | None = None, pacer_password: str | None = None
    ):
        """Initializes the Pacer class with the provided PACER credentials."""
        self.pacer_username = pacer_username or env.PACER_USERNAME
        self.pacer_password = pacer_password or env.PACER_PASSWORD
        self.cache = {}

    @property
    def driver(self) -> webdriver:
        """Returns a Selenium WebDriver instance ."""
        if "driver" not in self.cache:
            options = Options()
            options.add_argument("--headless")
            service = Service()
            self.cache["driver"] = webdriver.Firefox(options=options, service=service)
        return self.cache["driver"]

    @property
    def session(self) -> "PacerSession":  # noqa: F821
        """Returns a PacerSession instance."""
        from juriscraper.pacer import PacerSession

        if "session" not in self.cache:
            self.cache["session"] = PacerSession(
                username=self.pacer_username, password=self.pacer_password
            )
            self.cache["session"].selenium = self.driver
        return self.cache["session"]

    def __del__(self):
        """Destructor to clean up the session and driver instances."""
        if self.cache.get("driver") is not None:
            with suppress(Exception):
                self.driver.quit()

    def find_candidate_cases(self, docket_id: str) -> list[dict[str, str]]:
        """Finds candidate PACER cases for a given docket ID.

        Args:
            docket_id (str): The docket ID to search for.

        Returns:
            list: A list of candidate cases.
        """
        from juriscraper.lib.string_utils import force_unicode
        from juriscraper.pacer import PossibleCaseNumberApi

        court, docket_number = parse_docket_id(docket_id)
        case_numbers = PossibleCaseNumberApi(court, self.session)
        case_numbers.query(docket_number)

        return [
            {
                "docket_number": force_unicode(node.xpath("./@number")[0]),
                "pacer_case_id": force_unicode(node.xpath("./@id")[0]),
                "title": force_unicode(node.xpath("./@title")[0]),
            }
            for node in case_numbers.tree.xpath("//case")
        ]

    def purchase_docket(self, docket_id: str, **kwargs: Any) -> tuple[str, dict]:
        """Purchases a docket for a given docket ID.

        Args:
            docket_id (str): The docket ID to purchase.
            **kwargs: Additional query arguments to pass to juriscraper.

        Returns:
            tuple: A tuple containing the raw HTML and the parsed docket JSON.
        """
        court, _ = parse_docket_id(docket_id)
        pacer_case_id = self.find_candidate_cases(docket_id)[0]["pacer_case_id"]
        return self.purchase_docket_with_pacer_case_id(court, pacer_case_id, **kwargs)

    def purchase_docket_with_pacer_case_id(
        self,
        court: str,
        pacer_case_id: str,
        date_start: date | None = None,
        date_end: date | None = None,
        show_parties_and_counsel: bool = True,
        show_terminated_parties: bool = True,
        show_list_of_member_cases: bool = True,
        **kwargs: Any,
    ) -> tuple[str, dict]:
        """Purchases a docket for a given PACER case ID.

        Args:
            court (str): The court to purchase the docket from.
            pacer_case_id (str): The PACER case ID to purchase.
            date_start (date, optional): The start date for the docket search.
            date_end (date, optional): The end date for the docket search.
            show_parties_and_counsel (bool, optional): Whether to show parties
                and counsel.
            show_terminated_parties (bool, optional): Whether to show
                terminated parties.
            show_list_of_member_cases (bool, optional): Whether to show
                list of member cases.
            **kwargs: Additional query arguments to pass to juriscraper.

        Returns:
            tuple: A tuple containing the raw HTML and the parsed docket JSON.
        """
        from juriscraper.pacer import DocketReport

        docket_report = DocketReport(court, self.session)
        docket_report.query(
            pacer_case_id,
            date_start=date_start,
            date_end=date_end,
            show_parties_and_counsel=show_parties_and_counsel,
            show_terminated_parties=show_terminated_parties,
            show_list_of_member_cases=show_list_of_member_cases,
            **kwargs,
        )
        docket_html = docket_report.response.text
        docket_html = self.add_pacer_case_id_to_docket_html(docket_html, pacer_case_id)
        docket_json = docket_report.data
        docket_json["docket_id"] = construct_docket_id(
            court, docket_json["docket_number"]
        )
        docket_json["pacer_case_id"] = pacer_case_id
        return docket_html, docket_json

    def parse(self, docket_html: str, court: str) -> dict:
        """Parses the raw HTML of a docket and returns the parsed docket JSON.

        Args:
            docket_html (str): The raw HTML of the docket.
            court (str): The court to parse the docket from.

        Returns:
            dict: The parsed docket JSON.
        """
        from juriscraper.pacer import DocketReport

        parser = DocketReport(court)
        parser._parse_text(docket_html)
        docket_json = parser.data
        docket_json["docket_id"] = construct_docket_id(
            court, docket_json["docket_number"]
        )
        match = re.search(r"<!--PACER CASE ID: (.*?)-->", docket_html)
        if match:
            docket_json["pacer_case_id"] = match.group(1)
        return docket_json

    def get_attachments(self, pacer_doc_id: str, court: str) -> dict:
        """Retrieves the attachments for a given PACER document ID."""
        from juriscraper.pacer import AttachmentPage

        attachment_report = AttachmentPage(court, self.session)
        attachment_report.query(pacer_doc_id)
        return attachment_report.data

    def purchase_document(
        self, pacer_case_id: str, pacer_doc_id: str, court: str
    ) -> tuple[bytes, str]:
        """Purchases a document for a given PACER case ID and document ID.

        Args:
            pacer_case_id (str): The PACER case ID to purchase the document from.
            pacer_doc_id (str): The PACER document ID to purchase.
            court (str): The court to purchase the document from.

        Returns:
            tuple: A tuple containing the PDF content and the status of the purchase.
        """
        from juriscraper.pacer import DocketReport

        docket_report = DocketReport(court, self.session)
        r, status = docket_report.download_pdf(pacer_case_id, pacer_doc_id)
        pdf = r.content if r else None
        status = status if status else "success"
        return pdf, status

    def purchase_attachment(
        self, pacer_case_id: str, pacer_doc_id: str, attachment_number: str, court: str
    ) -> tuple[bytes, str]:
        """Purchases an attachment for a given PACER case ID and document ID.

        Args:
            pacer_case_id (str): The PACER case ID to purchase the attachment from.
            pacer_doc_id (str): The PACER document ID to purchase the attachment from.
            attachment_number (str): The attachment number to purchase.
            court (str): The court to purchase the attachment from.

        Returns:
            tuple: A tuple containing the PDF content and the status of the purchase.
        """
        from juriscraper.pacer import AttachmentPage

        attachments = AttachmentPage(court, self.session)
        attachments.query(pacer_doc_id)
        attachments = attachments.data["attachments"]
        for attachment in attachments:
            if int(attachment["attachment_number"]) == int(attachment_number):
                return self.purchase_document(
                    pacer_case_id, attachment["pacer_doc_id"], court
                )
        return None, "error"

    def add_pacer_case_id_to_docket_html(
        self, docket_html: str, pacer_case_id: str
    ) -> str:
        """Adds the PACER case ID to the docket HTML if it is not already present.

        Args:
            docket_html (str): The raw HTML of the docket.
            pacer_case_id (str): The PACER case ID to add to the docket HTML.

        Returns:
            str: The docket HTML with the PACER case ID added.
        """
        if not re.search(r"<!--PACER CASE ID: (.*?)-->", docket_html):
            docket_html += f"<!--PACER CASE ID: {pacer_case_id}-->"
        return docket_html

purchase_docket(docket_id, **kwargs)

Purchases a docket for a given docket ID.

Parameters:

Name Type Description Default
docket_id str

The docket ID to purchase.

required
**kwargs Any

Additional query arguments to pass to juriscraper.

{}

Returns:

Name Type Description
tuple tuple[str, dict]

A tuple containing the raw HTML and the parsed docket JSON.

Source code in docketanalyzer/pacer/pacer.py
def purchase_docket(self, docket_id: str, **kwargs: Any) -> tuple[str, dict]:
    """Purchases a docket for a given docket ID.

    Args:
        docket_id (str): The docket ID to purchase.
        **kwargs: Additional query arguments to pass to juriscraper.

    Returns:
        tuple: A tuple containing the raw HTML and the parsed docket JSON.
    """
    court, _ = parse_docket_id(docket_id)
    pacer_case_id = self.find_candidate_cases(docket_id)[0]["pacer_case_id"]
    return self.purchase_docket_with_pacer_case_id(court, pacer_case_id, **kwargs)

purchase_document(pacer_case_id, pacer_doc_id, court)

Purchases a document for a given PACER case ID and document ID.

Parameters:

Name Type Description Default
pacer_case_id str

The PACER case ID to purchase the document from.

required
pacer_doc_id str

The PACER document ID to purchase.

required
court str

The court to purchase the document from.

required

Returns:

Name Type Description
tuple tuple[bytes, str]

A tuple containing the PDF content and the status of the purchase.

Source code in docketanalyzer/pacer/pacer.py
def purchase_document(
    self, pacer_case_id: str, pacer_doc_id: str, court: str
) -> tuple[bytes, str]:
    """Purchases a document for a given PACER case ID and document ID.

    Args:
        pacer_case_id (str): The PACER case ID to purchase the document from.
        pacer_doc_id (str): The PACER document ID to purchase.
        court (str): The court to purchase the document from.

    Returns:
        tuple: A tuple containing the PDF content and the status of the purchase.
    """
    from juriscraper.pacer import DocketReport

    docket_report = DocketReport(court, self.session)
    r, status = docket_report.download_pdf(pacer_case_id, pacer_doc_id)
    pdf = r.content if r else None
    status = status if status else "success"
    return pdf, status

purchase_attachment(pacer_case_id, pacer_doc_id, attachment_number, court)

Purchases an attachment for a given PACER case ID and document ID.

Parameters:

Name Type Description Default
pacer_case_id str

The PACER case ID to purchase the attachment from.

required
pacer_doc_id str

The PACER document ID to purchase the attachment from.

required
attachment_number str

The attachment number to purchase.

required
court str

The court to purchase the attachment from.

required

Returns:

Name Type Description
tuple tuple[bytes, str]

A tuple containing the PDF content and the status of the purchase.

Source code in docketanalyzer/pacer/pacer.py
def purchase_attachment(
    self, pacer_case_id: str, pacer_doc_id: str, attachment_number: str, court: str
) -> tuple[bytes, str]:
    """Purchases an attachment for a given PACER case ID and document ID.

    Args:
        pacer_case_id (str): The PACER case ID to purchase the attachment from.
        pacer_doc_id (str): The PACER document ID to purchase the attachment from.
        attachment_number (str): The attachment number to purchase.
        court (str): The court to purchase the attachment from.

    Returns:
        tuple: A tuple containing the PDF content and the status of the purchase.
    """
    from juriscraper.pacer import AttachmentPage

    attachments = AttachmentPage(court, self.session)
    attachments.query(pacer_doc_id)
    attachments = attachments.data["attachments"]
    for attachment in attachments:
        if int(attachment["attachment_number"]) == int(attachment_number):
            return self.purchase_document(
                pacer_case_id, attachment["pacer_doc_id"], court
            )
    return None, "error"

parse(docket_html, court)

Parses the raw HTML of a docket and returns the parsed docket JSON.

Parameters:

Name Type Description Default
docket_html str

The raw HTML of the docket.

required
court str

The court to parse the docket from.

required

Returns:

Name Type Description
dict dict

The parsed docket JSON.

Source code in docketanalyzer/pacer/pacer.py
def parse(self, docket_html: str, court: str) -> dict:
    """Parses the raw HTML of a docket and returns the parsed docket JSON.

    Args:
        docket_html (str): The raw HTML of the docket.
        court (str): The court to parse the docket from.

    Returns:
        dict: The parsed docket JSON.
    """
    from juriscraper.pacer import DocketReport

    parser = DocketReport(court)
    parser._parse_text(docket_html)
    docket_json = parser.data
    docket_json["docket_id"] = construct_docket_id(
        court, docket_json["docket_number"]
    )
    match = re.search(r"<!--PACER CASE ID: (.*?)-->", docket_html)
    if match:
        docket_json["pacer_case_id"] = match.group(1)
    return docket_json

find_candidate_cases(docket_id)

Finds candidate PACER cases for a given docket ID.

Parameters:

Name Type Description Default
docket_id str

The docket ID to search for.

required

Returns:

Name Type Description
list list[dict[str, str]]

A list of candidate cases.

Source code in docketanalyzer/pacer/pacer.py
def find_candidate_cases(self, docket_id: str) -> list[dict[str, str]]:
    """Finds candidate PACER cases for a given docket ID.

    Args:
        docket_id (str): The docket ID to search for.

    Returns:
        list: A list of candidate cases.
    """
    from juriscraper.lib.string_utils import force_unicode
    from juriscraper.pacer import PossibleCaseNumberApi

    court, docket_number = parse_docket_id(docket_id)
    case_numbers = PossibleCaseNumberApi(court, self.session)
    case_numbers.query(docket_number)

    return [
        {
            "docket_number": force_unicode(node.xpath("./@number")[0]),
            "pacer_case_id": force_unicode(node.xpath("./@id")[0]),
            "title": force_unicode(node.xpath("./@title")[0]),
        }
        for node in case_numbers.tree.xpath("//case")
    ]

Services

services

Database

A PostgreSQL database manager that provides high-level database operations.

This class handles database connections, table management, model registration, and provides an interface for table operations with schemaless tables through the Tables class.

Source code in docketanalyzer/services/psql.py
class Database:
    """A PostgreSQL database manager that provides high-level database operations.

    This class handles database connections, table management, model registration,
        and provides an interface for table operations with schemaless tables through
        the Tables class.
    """

    def __init__(
        self,
        connection: str | None = None,
        registered_models: list[type[DatabaseModel]] | None = None,
    ) -> None:
        """Initialize the database manager.

        Args:
            connection (str, optional): PostgreSQL connection URL
            registered_models (list): List of model classes to register with
                the database
        """
        self.connection = connection or env.POSTGRES_URL
        self.db: PostgresqlExtDatabase | None = None
        self.connect()
        self.registered_models: dict[str, type[DatabaseModel]] = {}
        if registered_models is not None:
            for model in registered_models:
                self.register_model(model)
        self.t = Tables(self)
        self.cache: dict[str, Any] = {}

    def connect(self) -> None:
        """Establish connection to the PostgreSQL database using the connection URL."""
        url = urlparse(self.connection)
        self.db = PostgresqlExtDatabase(
            database=url.path[1:],
            user=url.username,
            password=url.password,
            host=url.hostname,
            port=url.port,
        )

    def status(self) -> bool:
        """Check if the database connection is working.

        Returns:
            bool: True if connection is successful, False otherwise
        """
        return self.db.connect()

    @property
    def meta(self) -> dict[str, dict[str, Any]]:
        """Get database metadata including table and column information.

        Returns:
            dict: Database metadata including table schemas and foreign keys
        """
        if "meta" not in self.cache:
            meta = {}
            introspector = Introspector.from_database(self.db)
            metadata = introspector.introspect()
            fks = metadata.foreign_keys
            for table_name, columns in metadata.columns.items():
                meta[table_name] = {
                    "name": metadata.model_names[table_name],
                    "columns": columns,
                }
                if table_name in fks:
                    meta[table_name]["foreign_keys"] = {
                        x.column: x for x in fks[table_name]
                    }
            self.cache["meta"] = meta
        return self.cache["meta"]

    def reload(self):
        """Reload the database metadata and registered models."""
        self.close()
        self.__init__(
            connection=self.connection,
            registered_models=list(self.registered_models.values()),
        )

    def register_model(self, model: type[DatabaseModel]) -> None:
        """Register a model class with the database manager.

        Args:
            model: Peewee model class to register
        """
        self.registered_models[model._meta.table_name] = model
        model.db_manager = self
        model._meta.database = self.db

    def load_table_class(self, name: str, new: bool = False) -> type[DatabaseModel]:
        """Dynamically create a model class for a database table.

        Args:
            name (str): Name of the table
            new (bool): Whether this is a new table being created

        Returns:
            type: A new DatabaseModel subclass representing the table

        Raises:
            KeyError: If table doesn't exist and new=False
        """
        if not new and name not in self.meta:
            raise KeyError(
                f"Table {name} does not exist. Use db.create_table to create it."
            )

        class Meta:
            database = self.db
            table_name = name

        attrs = {"Meta": Meta}

        if not new:
            table_meta = self.meta[name]
            for column_name, column in table_meta["columns"].items():
                keeps = [
                    "column_name",
                    "index",
                    "primary_key",
                    "unique",
                    "default",
                    "model",
                ]
                rename = {"nullable": "null"}
                column_args = {
                    k: v
                    for k, v in column.__dict__.items()
                    if k in keeps + list(rename.keys())
                }
                fk = table_meta["foreign_keys"].get(column_args["column_name"])
                if fk:
                    column_args["model"] = self.load_table_class(fk.dest_table)
                for k, v in rename.items():
                    column_args[v] = column_args.pop(k)
                attrs[column_name] = column.field_class(**column_args)
        table_class = type(name, (DatabaseModel,), attrs)
        table_class.db_manager = self
        return table_class

    def create_table(
        self, name_or_model: str | type[DatabaseModel], exists_ok: bool = True
    ) -> None:
        """Create a new table in the database.

        Args:
            name_or_model (Union[str, Type[DatabaseModel]]): Name of the table to
                create or model class
            exists_ok (bool): Whether to silently continue if table exists

        Raises:
            ValueError: If table exists and exists_ok=False
        """
        if isinstance(name_or_model, str):
            name = name_or_model
            if name in self.meta:
                if not exists_ok:
                    raise ValueError(f"Table {name} already exists.")
                return
            table = self.load_table_class(name, new=True)
        else:
            table = name_or_model
        self.db.create_tables([table])
        self.reload()

    def drop_table(self, name: str, confirm: bool = True):
        """Drop a table from the database.

        Args:
            name (str): Name of the table to drop
            confirm (bool): Whether to prompt for confirmation before dropping

        Raises:
            Exception: If confirmation is required and user does not confirm
        """
        if confirm:
            response = input(
                notabs(f"""
                Are you sure you want to drop table '{name}'?
                This will DELETE ALL TABLE DATA.

                Are you sure you want to proceed? (y/n):
            """)
            ).lower()
            if response != "y":
                raise Exception("Aborted")
        table = self.t[name]
        self.db.drop_tables([table])
        self.reload()

    def close(self) -> None:
        """Close the database connection."""
        self.db.close()

meta property

Get database metadata including table and column information.

Returns:

Name Type Description
dict dict[str, dict[str, Any]]

Database metadata including table schemas and foreign keys

__init__(connection=None, registered_models=None)

Initialize the database manager.

Parameters:

Name Type Description Default
connection str

PostgreSQL connection URL

None
registered_models list

List of model classes to register with the database

None
Source code in docketanalyzer/services/psql.py
def __init__(
    self,
    connection: str | None = None,
    registered_models: list[type[DatabaseModel]] | None = None,
) -> None:
    """Initialize the database manager.

    Args:
        connection (str, optional): PostgreSQL connection URL
        registered_models (list): List of model classes to register with
            the database
    """
    self.connection = connection or env.POSTGRES_URL
    self.db: PostgresqlExtDatabase | None = None
    self.connect()
    self.registered_models: dict[str, type[DatabaseModel]] = {}
    if registered_models is not None:
        for model in registered_models:
            self.register_model(model)
    self.t = Tables(self)
    self.cache: dict[str, Any] = {}

connect()

Establish connection to the PostgreSQL database using the connection URL.

Source code in docketanalyzer/services/psql.py
def connect(self) -> None:
    """Establish connection to the PostgreSQL database using the connection URL."""
    url = urlparse(self.connection)
    self.db = PostgresqlExtDatabase(
        database=url.path[1:],
        user=url.username,
        password=url.password,
        host=url.hostname,
        port=url.port,
    )

status()

Check if the database connection is working.

Returns:

Name Type Description
bool bool

True if connection is successful, False otherwise

Source code in docketanalyzer/services/psql.py
def status(self) -> bool:
    """Check if the database connection is working.

    Returns:
        bool: True if connection is successful, False otherwise
    """
    return self.db.connect()

reload()

Reload the database metadata and registered models.

Source code in docketanalyzer/services/psql.py
def reload(self):
    """Reload the database metadata and registered models."""
    self.close()
    self.__init__(
        connection=self.connection,
        registered_models=list(self.registered_models.values()),
    )

register_model(model)

Register a model class with the database manager.

Parameters:

Name Type Description Default
model type[DatabaseModel]

Peewee model class to register

required
Source code in docketanalyzer/services/psql.py
def register_model(self, model: type[DatabaseModel]) -> None:
    """Register a model class with the database manager.

    Args:
        model: Peewee model class to register
    """
    self.registered_models[model._meta.table_name] = model
    model.db_manager = self
    model._meta.database = self.db

load_table_class(name, new=False)

Dynamically create a model class for a database table.

Parameters:

Name Type Description Default
name str

Name of the table

required
new bool

Whether this is a new table being created

False

Returns:

Name Type Description
type type[DatabaseModel]

A new DatabaseModel subclass representing the table

Raises:

Type Description
KeyError

If table doesn't exist and new=False

Source code in docketanalyzer/services/psql.py
def load_table_class(self, name: str, new: bool = False) -> type[DatabaseModel]:
    """Dynamically create a model class for a database table.

    Args:
        name (str): Name of the table
        new (bool): Whether this is a new table being created

    Returns:
        type: A new DatabaseModel subclass representing the table

    Raises:
        KeyError: If table doesn't exist and new=False
    """
    if not new and name not in self.meta:
        raise KeyError(
            f"Table {name} does not exist. Use db.create_table to create it."
        )

    class Meta:
        database = self.db
        table_name = name

    attrs = {"Meta": Meta}

    if not new:
        table_meta = self.meta[name]
        for column_name, column in table_meta["columns"].items():
            keeps = [
                "column_name",
                "index",
                "primary_key",
                "unique",
                "default",
                "model",
            ]
            rename = {"nullable": "null"}
            column_args = {
                k: v
                for k, v in column.__dict__.items()
                if k in keeps + list(rename.keys())
            }
            fk = table_meta["foreign_keys"].get(column_args["column_name"])
            if fk:
                column_args["model"] = self.load_table_class(fk.dest_table)
            for k, v in rename.items():
                column_args[v] = column_args.pop(k)
            attrs[column_name] = column.field_class(**column_args)
    table_class = type(name, (DatabaseModel,), attrs)
    table_class.db_manager = self
    return table_class

create_table(name_or_model, exists_ok=True)

Create a new table in the database.

Parameters:

Name Type Description Default
name_or_model Union[str, Type[DatabaseModel]]

Name of the table to create or model class

required
exists_ok bool

Whether to silently continue if table exists

True

Raises:

Type Description
ValueError

If table exists and exists_ok=False

Source code in docketanalyzer/services/psql.py
def create_table(
    self, name_or_model: str | type[DatabaseModel], exists_ok: bool = True
) -> None:
    """Create a new table in the database.

    Args:
        name_or_model (Union[str, Type[DatabaseModel]]): Name of the table to
            create or model class
        exists_ok (bool): Whether to silently continue if table exists

    Raises:
        ValueError: If table exists and exists_ok=False
    """
    if isinstance(name_or_model, str):
        name = name_or_model
        if name in self.meta:
            if not exists_ok:
                raise ValueError(f"Table {name} already exists.")
            return
        table = self.load_table_class(name, new=True)
    else:
        table = name_or_model
    self.db.create_tables([table])
    self.reload()

drop_table(name, confirm=True)

Drop a table from the database.

Parameters:

Name Type Description Default
name str

Name of the table to drop

required
confirm bool

Whether to prompt for confirmation before dropping

True

Raises:

Type Description
Exception

If confirmation is required and user does not confirm

Source code in docketanalyzer/services/psql.py
def drop_table(self, name: str, confirm: bool = True):
    """Drop a table from the database.

    Args:
        name (str): Name of the table to drop
        confirm (bool): Whether to prompt for confirmation before dropping

    Raises:
        Exception: If confirmation is required and user does not confirm
    """
    if confirm:
        response = input(
            notabs(f"""
            Are you sure you want to drop table '{name}'?
            This will DELETE ALL TABLE DATA.

            Are you sure you want to proceed? (y/n):
        """)
        ).lower()
        if response != "y":
            raise Exception("Aborted")
    table = self.t[name]
    self.db.drop_tables([table])
    self.reload()

close()

Close the database connection.

Source code in docketanalyzer/services/psql.py
def close(self) -> None:
    """Close the database connection."""
    self.db.close()

DatabaseModel

Bases: DatabaseModelQueryMixin, Model

A base model class that extends Peewee's Model with additional functionality.

This class provides enhanced database operations including pandas DataFrame conversion, batch processing, column management, and model reloading capabilities.

Source code in docketanalyzer/services/psql.py
class DatabaseModel(DatabaseModelQueryMixin, Model):
    """A base model class that extends Peewee's Model with additional functionality.

    This class provides enhanced database operations including pandas DataFrame
        conversion, batch processing, column management, and model reloading
        capabilities.
    """

    db_manager = None

    @classmethod
    def drop_column(cls, column_name: str, confirm: bool = True) -> None:
        """Drop a column from the database table.

        Args:
            column_name (str): Name of the column to drop
            confirm (bool): Whether to prompt for confirmation before dropping
        """
        table_name = cls._meta.table_name
        if confirm:
            response = input(
                notabs(f"""
                Are you sure you want to drop '{column_name}' from '{table_name}'?
                This will DELETE ALL COLUMN DATA.

                Are you sure you want to proceed? (y/n):
            """)
            ).lower()
            if response != "y":
                raise Exception("Aborted")
        migrator = PostgresqlMigrator(cls._meta.database)
        migrate(migrator.drop_column(table_name, column_name))
        cls.reload()

    @classmethod
    def add_column(
        cls,
        column_name: str,
        column_type: str,
        null: bool = True,
        overwrite: bool = False,
        exists_ok: bool = True,
        **kwargs: Any,
    ) -> None:
        """Add a new column to the database table.

        Args:
            column_name (str): Name of the new column
            column_type (str): Peewee field type for the column
            null (bool, optional): Whether the column can contain NULL values
            overwrite (bool): Whether to overwrite if column exists
            exists_ok (bool): Whether to silently continue if column exists
            **kwargs: Additional field parameters passed to Peewee
        """
        table_name = cls._meta.table_name
        table_meta = cls.db_manager.meta[table_name]
        migrator = PostgresqlMigrator(cls._meta.database)
        if column_name in table_meta["columns"]:
            if not exists_ok:
                raise ValueError(
                    f"Column {column_name} already exists in table {table_name}."
                )
            if not overwrite:
                return
            cls.drop_column(column_name)

        kwargs["null"] = null
        migrate(
            migrator.add_column(
                table_name, column_name, getattr(peewee, column_type)(**kwargs)
            )
        )
        cls.reload()

    @classmethod
    def add_data(
        cls, data: pd.DataFrame, copy: bool = False, batch_size: int = 1000
    ) -> None:
        """Add data to the table from a pandas DataFrame.

        Args:
            data (pd.DataFrame): DataFrame containing the data to insert
            copy (bool): Whether to use Postgres COPY command for faster insertion
            batch_size (int): Number of records to insert in each batch
                when not using COPY
        """
        if copy:
            conn = cls._meta.database.connection()
            with conn.cursor() as cursor:
                buffer = StringIO()
                csv_writer = csv.writer(
                    buffer, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
                )
                for _, row in data.iterrows():
                    csv_writer.writerow(
                        [
                            "\\N" if pd.isna(value) or value == "" else str(value)
                            for value in row
                        ]
                    )
                buffer.seek(0)

                cols = ",".join(data.columns)
                table_name = cls._meta.table_name
                cursor.copy_expert(
                    f"COPY {table_name} ({cols}) FROM STDIN WITH CSV NULL AS '\\N'",
                    buffer,
                )
        else:
            data = data.to_dict(orient="records")
            with cls._meta.database.atomic():
                for i in range(0, len(data), batch_size):
                    batch = data[i : i + batch_size]
                    cls.insert_many(batch).execute()

    @classmethod
    def reload(cls):
        """Reload the model class to reflect any changes in the database schema."""
        cls.db_manager.reload()
        new_table = cls.db_manager.load_table_class(cls._meta.table_name)
        new_attrs = dir(new_table)
        attrs = dir(cls)
        for attr in attrs:
            if attr not in new_attrs:
                delattr(cls, attr)
        for attr in new_attrs:
            if not attr.startswith("__"):
                setattr(cls, attr, getattr(new_table, attr))

drop_column(column_name, confirm=True) classmethod

Drop a column from the database table.

Parameters:

Name Type Description Default
column_name str

Name of the column to drop

required
confirm bool

Whether to prompt for confirmation before dropping

True
Source code in docketanalyzer/services/psql.py
@classmethod
def drop_column(cls, column_name: str, confirm: bool = True) -> None:
    """Drop a column from the database table.

    Args:
        column_name (str): Name of the column to drop
        confirm (bool): Whether to prompt for confirmation before dropping
    """
    table_name = cls._meta.table_name
    if confirm:
        response = input(
            notabs(f"""
            Are you sure you want to drop '{column_name}' from '{table_name}'?
            This will DELETE ALL COLUMN DATA.

            Are you sure you want to proceed? (y/n):
        """)
        ).lower()
        if response != "y":
            raise Exception("Aborted")
    migrator = PostgresqlMigrator(cls._meta.database)
    migrate(migrator.drop_column(table_name, column_name))
    cls.reload()

add_column(column_name, column_type, null=True, overwrite=False, exists_ok=True, **kwargs) classmethod

Add a new column to the database table.

Parameters:

Name Type Description Default
column_name str

Name of the new column

required
column_type str

Peewee field type for the column

required
null bool

Whether the column can contain NULL values

True
overwrite bool

Whether to overwrite if column exists

False
exists_ok bool

Whether to silently continue if column exists

True
**kwargs Any

Additional field parameters passed to Peewee

{}
Source code in docketanalyzer/services/psql.py
@classmethod
def add_column(
    cls,
    column_name: str,
    column_type: str,
    null: bool = True,
    overwrite: bool = False,
    exists_ok: bool = True,
    **kwargs: Any,
) -> None:
    """Add a new column to the database table.

    Args:
        column_name (str): Name of the new column
        column_type (str): Peewee field type for the column
        null (bool, optional): Whether the column can contain NULL values
        overwrite (bool): Whether to overwrite if column exists
        exists_ok (bool): Whether to silently continue if column exists
        **kwargs: Additional field parameters passed to Peewee
    """
    table_name = cls._meta.table_name
    table_meta = cls.db_manager.meta[table_name]
    migrator = PostgresqlMigrator(cls._meta.database)
    if column_name in table_meta["columns"]:
        if not exists_ok:
            raise ValueError(
                f"Column {column_name} already exists in table {table_name}."
            )
        if not overwrite:
            return
        cls.drop_column(column_name)

    kwargs["null"] = null
    migrate(
        migrator.add_column(
            table_name, column_name, getattr(peewee, column_type)(**kwargs)
        )
    )
    cls.reload()

add_data(data, copy=False, batch_size=1000) classmethod

Add data to the table from a pandas DataFrame.

Parameters:

Name Type Description Default
data DataFrame

DataFrame containing the data to insert

required
copy bool

Whether to use Postgres COPY command for faster insertion

False
batch_size int

Number of records to insert in each batch when not using COPY

1000
Source code in docketanalyzer/services/psql.py
@classmethod
def add_data(
    cls, data: pd.DataFrame, copy: bool = False, batch_size: int = 1000
) -> None:
    """Add data to the table from a pandas DataFrame.

    Args:
        data (pd.DataFrame): DataFrame containing the data to insert
        copy (bool): Whether to use Postgres COPY command for faster insertion
        batch_size (int): Number of records to insert in each batch
            when not using COPY
    """
    if copy:
        conn = cls._meta.database.connection()
        with conn.cursor() as cursor:
            buffer = StringIO()
            csv_writer = csv.writer(
                buffer, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
            )
            for _, row in data.iterrows():
                csv_writer.writerow(
                    [
                        "\\N" if pd.isna(value) or value == "" else str(value)
                        for value in row
                    ]
                )
            buffer.seek(0)

            cols = ",".join(data.columns)
            table_name = cls._meta.table_name
            cursor.copy_expert(
                f"COPY {table_name} ({cols}) FROM STDIN WITH CSV NULL AS '\\N'",
                buffer,
            )
    else:
        data = data.to_dict(orient="records")
        with cls._meta.database.atomic():
            for i in range(0, len(data), batch_size):
                batch = data[i : i + batch_size]
                cls.insert_many(batch).execute()

reload() classmethod

Reload the model class to reflect any changes in the database schema.

Source code in docketanalyzer/services/psql.py
@classmethod
def reload(cls):
    """Reload the model class to reflect any changes in the database schema."""
    cls.db_manager.reload()
    new_table = cls.db_manager.load_table_class(cls._meta.table_name)
    new_attrs = dir(new_table)
    attrs = dir(cls)
    for attr in attrs:
        if attr not in new_attrs:
            delattr(cls, attr)
    for attr in new_attrs:
        if not attr.startswith("__"):
            setattr(cls, attr, getattr(new_table, attr))

S3

A class for syncing local data with an S3 bucket.

Attributes:

Name Type Description
data_dir Path

Local directory for data storage.

bucket Path

S3 bucket name.

endpoint_url Optional[str]

Custom S3 endpoint URL.

client client

Boto3 S3 client for direct API interactions.

Source code in docketanalyzer/services/s3.py
class S3:
    """A class for syncing local data with an S3 bucket.

    Attributes:
        data_dir (Path): Local directory for data storage.
        bucket (Path): S3 bucket name.
        endpoint_url (Optional[str]): Custom S3 endpoint URL.
        client (boto3.client): Boto3 S3 client for direct API interactions.
    """

    def __init__(self, data_dir: str | None = None) -> None:
        """Initialize the S3 service.

        Args:
            data_dir (Optional[str]): Path to local data directory.
                If None, uses env.DATA_DIR.
        """
        export_env()
        self.data_dir = Path(data_dir or env.DATA_DIR)
        self.bucket = env.AWS_S3_BUCKET_NAME
        self.endpoint_url = env.AWS_S3_ENDPOINT_URL
        self.client = boto3.client(
            "s3",
            endpoint_url=self.endpoint_url,
            aws_access_key_id=env.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=env.AWS_SECRET_ACCESS_KEY,
            config=Config(signature_version="s3v4"),
        )

    def _sync(
        self,
        from_path: str | Path,
        to_path: str | Path,
        confirm: bool = False,
        exclude_hidden: bool = True,
        exact_timestamps: bool = True,
        **kwargs: Any,
    ) -> None:
        """Execute an AWS S3 sync command between two paths.

        This is a private helper method that constructs and executes an AWS CLI command
        for syncing files between local and S3 storage.

        Args:
            from_path (Union[str, Path]): Source path to sync from.
            to_path (Union[str, Path]): Destination path to sync to.
            confirm (bool): If True, asks for confirmation before executing the command.
            exclude_hidden (bool): If True, excludes hidden files and directories.
            exact_timestamps (bool): If True, compares timestamps.
            **kwargs: Additional arguments to pass to the AWS CLI s3 sync command.
        """
        cmd = f"aws s3 sync {from_path} {to_path}"

        if self.endpoint_url is not None:
            cmd += f" --endpoint-url {self.endpoint_url}"

        if exclude_hidden:
            cmd += ' --exclude "*/.*" --exclude ".*"'

        kwargs["exact_timestamps"] = exact_timestamps
        kwargs = {k: v for k, v in kwargs.items() if v is not None}

        for k, v in kwargs.items():
            k = k.replace("_", "-")
            if isinstance(v, bool):
                if v:
                    cmd += f" --{k}"
            elif isinstance(v, list | tuple):
                for item in v:
                    cmd += f' --{k} "{item}"'
            else:
                cmd += f' --{k} "{v}"'

        if confirm:
            print("Run the following command?")
            print(cmd)
            response = input("y/n: ")
            if response == "y":
                os.system(cmd)
        else:
            os.system(cmd)

    def _prepare_paths(
        self,
        path: str | Path | None,
        from_path: str | Path | None,
        to_path: str | Path | None,
    ) -> tuple[Path, Path]:
        """Prepare source and destination paths for sync operations.

        This method handles path normalization and ensures paths are properly
        formatted for sync operations.

        Args:
            path (Optional[Union[str, Path]]): If provided, used as both
                from_path and to_path.
            from_path (Optional[Union[str, Path]]): Source path for sync operation.
            to_path (Optional[Union[str, Path]]): Destination path for sync operation.

        Returns:
            Tuple[Path, Path]: Normalized from_path and to_path.
        """
        if path is not None:
            path = Path(path)

            with suppress(ValueError):
                path = path.relative_to(self.data_dir)
            from_path = to_path = path

        if path is None and from_path is None and to_path is None:
            raise ValueError("Must provide at least one path argument")

        from_path = Path() if from_path is None else Path(from_path)
        to_path = Path() if to_path is None else Path(to_path)

        return from_path, to_path

    def push(
        self,
        path: str | Path | None = None,
        from_path: str | Path | None = None,
        to_path: str | Path | None = None,
        **kwargs: Any,
    ) -> None:
        """Push data from local storage to S3.

        Syncs files from a local directory to an S3 bucket path.

        Args:
            path (Optional[Union[str, Path]]): If provided, used as both
                from_path and to_path.
            from_path (Optional[Union[str, Path]]): Local source path to sync from.
            to_path (Optional[Union[str, Path]]): S3 destination path to sync to.
            **kwargs: Additional arguments to pass to the AWS CLI s3 sync command.
        """
        from_path, to_path = self._prepare_paths(path, from_path, to_path)
        if self.data_dir is not None:
            from_path = self.data_dir / from_path
        to_path = f"s3://{Path(self.bucket) / to_path}"
        self._sync(from_path, to_path, **kwargs)

    def pull(
        self,
        path: str | Path | None = None,
        from_path: str | Path | None = None,
        to_path: str | Path | None = None,
        **kwargs: Any,
    ) -> None:
        """Pull data from S3 to local storage.

        Syncs files from an S3 bucket path to a local directory.

        Args:
            path (Optional[Union[str, Path]]): If provided, used as both
                from_path and to_path.
            from_path (Optional[Union[str, Path]]): S3 source path to sync from.
            to_path (Optional[Union[str, Path]]): Local destination path to sync to.
            **kwargs: Additional arguments to pass to the AWS CLI s3 sync command.
        """
        from_path, to_path = self._prepare_paths(path, from_path, to_path)
        if self.data_dir is not None:
            to_path = self.data_dir / to_path
        from_path = f"s3://{Path(self.bucket) / from_path}"
        self._sync(from_path, to_path, **kwargs)

    def download(self, s3_key: str, local_path: str | Path | None = None) -> Path:
        """Download a single file from S3 using the boto3 client.

        This method downloads a specific file from S3 to a local path.
        If local_path is not provided, it will mirror the S3 path structure
        in the data directory.

        Args:
            s3_key (str): The key of the file in the S3 bucket.
            local_path (Optional[Union[str, Path]]): The local path to save the file to.
                If None, the file will be saved to data_dir/s3_key.

        Returns:
            Path: The path to the downloaded file.

        Raises:
            botocore.exceptions.ClientError: If the download fails.
        """
        local_path = self.data_dir / s3_key if local_path is None else Path(local_path)
        local_path.parent.mkdir(parents=True, exist_ok=True)

        self.client.download_file(
            Bucket=self.bucket, Key=s3_key, Filename=str(local_path)
        )

        return local_path

    def upload(self, local_path: str | Path, s3_key: str | None = None) -> str:
        """Upload a single file to S3 using the boto3 client.

        This method uploads a specific file from a local path to S3.
        If s3_key is not provided, it will use the relative path from data_dir
        as the S3 key.

        Args:
            local_path (Union[str, Path]): The local path of the file to upload.
            s3_key (Optional[str]): The key to use in the S3 bucket.
                If None, the relative path from data_dir will be used.

        Returns:
            str: The S3 key of the uploaded file.

        Raises:
            FileNotFoundError: If the local file does not exist.
            botocore.exceptions.ClientError: If the upload fails.
        """
        local_path = Path(local_path)

        if not local_path.exists():
            raise FileNotFoundError(f"File not found: {local_path}")

        if s3_key is None:
            try:
                s3_key = str(local_path.relative_to(self.data_dir))
            except ValueError:
                s3_key = local_path.name

        self.client.upload_file(
            Filename=str(local_path), Bucket=self.bucket, Key=s3_key
        )

        return s3_key

    def delete(self, s3_key: str) -> None:
        """Delete a single file from S3 using the boto3 client.

        Args:
            s3_key (str): The key of the file in the S3 bucket to delete.

        Raises:
            botocore.exceptions.ClientError: If the deletion fails.
        """
        self.client.delete_object(Bucket=self.bucket, Key=s3_key)

    def status(self) -> bool:
        """Check if S3 connection is working."""
        try:
            self.client.list_buckets()
            return True
        except Exception:
            return False

__init__(data_dir=None)

Initialize the S3 service.

Parameters:

Name Type Description Default
data_dir Optional[str]

Path to local data directory. If None, uses env.DATA_DIR.

None
Source code in docketanalyzer/services/s3.py
def __init__(self, data_dir: str | None = None) -> None:
    """Initialize the S3 service.

    Args:
        data_dir (Optional[str]): Path to local data directory.
            If None, uses env.DATA_DIR.
    """
    export_env()
    self.data_dir = Path(data_dir or env.DATA_DIR)
    self.bucket = env.AWS_S3_BUCKET_NAME
    self.endpoint_url = env.AWS_S3_ENDPOINT_URL
    self.client = boto3.client(
        "s3",
        endpoint_url=self.endpoint_url,
        aws_access_key_id=env.AWS_ACCESS_KEY_ID,
        aws_secret_access_key=env.AWS_SECRET_ACCESS_KEY,
        config=Config(signature_version="s3v4"),
    )

push(path=None, from_path=None, to_path=None, **kwargs)

Push data from local storage to S3.

Syncs files from a local directory to an S3 bucket path.

Parameters:

Name Type Description Default
path Optional[Union[str, Path]]

If provided, used as both from_path and to_path.

None
from_path Optional[Union[str, Path]]

Local source path to sync from.

None
to_path Optional[Union[str, Path]]

S3 destination path to sync to.

None
**kwargs Any

Additional arguments to pass to the AWS CLI s3 sync command.

{}
Source code in docketanalyzer/services/s3.py
def push(
    self,
    path: str | Path | None = None,
    from_path: str | Path | None = None,
    to_path: str | Path | None = None,
    **kwargs: Any,
) -> None:
    """Push data from local storage to S3.

    Syncs files from a local directory to an S3 bucket path.

    Args:
        path (Optional[Union[str, Path]]): If provided, used as both
            from_path and to_path.
        from_path (Optional[Union[str, Path]]): Local source path to sync from.
        to_path (Optional[Union[str, Path]]): S3 destination path to sync to.
        **kwargs: Additional arguments to pass to the AWS CLI s3 sync command.
    """
    from_path, to_path = self._prepare_paths(path, from_path, to_path)
    if self.data_dir is not None:
        from_path = self.data_dir / from_path
    to_path = f"s3://{Path(self.bucket) / to_path}"
    self._sync(from_path, to_path, **kwargs)

pull(path=None, from_path=None, to_path=None, **kwargs)

Pull data from S3 to local storage.

Syncs files from an S3 bucket path to a local directory.

Parameters:

Name Type Description Default
path Optional[Union[str, Path]]

If provided, used as both from_path and to_path.

None
from_path Optional[Union[str, Path]]

S3 source path to sync from.

None
to_path Optional[Union[str, Path]]

Local destination path to sync to.

None
**kwargs Any

Additional arguments to pass to the AWS CLI s3 sync command.

{}
Source code in docketanalyzer/services/s3.py
def pull(
    self,
    path: str | Path | None = None,
    from_path: str | Path | None = None,
    to_path: str | Path | None = None,
    **kwargs: Any,
) -> None:
    """Pull data from S3 to local storage.

    Syncs files from an S3 bucket path to a local directory.

    Args:
        path (Optional[Union[str, Path]]): If provided, used as both
            from_path and to_path.
        from_path (Optional[Union[str, Path]]): S3 source path to sync from.
        to_path (Optional[Union[str, Path]]): Local destination path to sync to.
        **kwargs: Additional arguments to pass to the AWS CLI s3 sync command.
    """
    from_path, to_path = self._prepare_paths(path, from_path, to_path)
    if self.data_dir is not None:
        to_path = self.data_dir / to_path
    from_path = f"s3://{Path(self.bucket) / from_path}"
    self._sync(from_path, to_path, **kwargs)

download(s3_key, local_path=None)

Download a single file from S3 using the boto3 client.

This method downloads a specific file from S3 to a local path. If local_path is not provided, it will mirror the S3 path structure in the data directory.

Parameters:

Name Type Description Default
s3_key str

The key of the file in the S3 bucket.

required
local_path Optional[Union[str, Path]]

The local path to save the file to. If None, the file will be saved to data_dir/s3_key.

None

Returns:

Name Type Description
Path Path

The path to the downloaded file.

Raises:

Type Description
ClientError

If the download fails.

Source code in docketanalyzer/services/s3.py
def download(self, s3_key: str, local_path: str | Path | None = None) -> Path:
    """Download a single file from S3 using the boto3 client.

    This method downloads a specific file from S3 to a local path.
    If local_path is not provided, it will mirror the S3 path structure
    in the data directory.

    Args:
        s3_key (str): The key of the file in the S3 bucket.
        local_path (Optional[Union[str, Path]]): The local path to save the file to.
            If None, the file will be saved to data_dir/s3_key.

    Returns:
        Path: The path to the downloaded file.

    Raises:
        botocore.exceptions.ClientError: If the download fails.
    """
    local_path = self.data_dir / s3_key if local_path is None else Path(local_path)
    local_path.parent.mkdir(parents=True, exist_ok=True)

    self.client.download_file(
        Bucket=self.bucket, Key=s3_key, Filename=str(local_path)
    )

    return local_path

upload(local_path, s3_key=None)

Upload a single file to S3 using the boto3 client.

This method uploads a specific file from a local path to S3. If s3_key is not provided, it will use the relative path from data_dir as the S3 key.

Parameters:

Name Type Description Default
local_path Union[str, Path]

The local path of the file to upload.

required
s3_key Optional[str]

The key to use in the S3 bucket. If None, the relative path from data_dir will be used.

None

Returns:

Name Type Description
str str

The S3 key of the uploaded file.

Raises:

Type Description
FileNotFoundError

If the local file does not exist.

ClientError

If the upload fails.

Source code in docketanalyzer/services/s3.py
def upload(self, local_path: str | Path, s3_key: str | None = None) -> str:
    """Upload a single file to S3 using the boto3 client.

    This method uploads a specific file from a local path to S3.
    If s3_key is not provided, it will use the relative path from data_dir
    as the S3 key.

    Args:
        local_path (Union[str, Path]): The local path of the file to upload.
        s3_key (Optional[str]): The key to use in the S3 bucket.
            If None, the relative path from data_dir will be used.

    Returns:
        str: The S3 key of the uploaded file.

    Raises:
        FileNotFoundError: If the local file does not exist.
        botocore.exceptions.ClientError: If the upload fails.
    """
    local_path = Path(local_path)

    if not local_path.exists():
        raise FileNotFoundError(f"File not found: {local_path}")

    if s3_key is None:
        try:
            s3_key = str(local_path.relative_to(self.data_dir))
        except ValueError:
            s3_key = local_path.name

    self.client.upload_file(
        Filename=str(local_path), Bucket=self.bucket, Key=s3_key
    )

    return s3_key

delete(s3_key)

Delete a single file from S3 using the boto3 client.

Parameters:

Name Type Description Default
s3_key str

The key of the file in the S3 bucket to delete.

required

Raises:

Type Description
ClientError

If the deletion fails.

Source code in docketanalyzer/services/s3.py
def delete(self, s3_key: str) -> None:
    """Delete a single file from S3 using the boto3 client.

    Args:
        s3_key (str): The key of the file in the S3 bucket to delete.

    Raises:
        botocore.exceptions.ClientError: If the deletion fails.
    """
    self.client.delete_object(Bucket=self.bucket, Key=s3_key)

status()

Check if S3 connection is working.

Source code in docketanalyzer/services/s3.py
def status(self) -> bool:
    """Check if S3 connection is working."""
    try:
        self.client.list_buckets()
        return True
    except Exception:
        return False

load_elastic(**kwargs)

Load an Elasticsearch client with the configured connection URL.

Run da configure elastic to set the connection URL.

Source code in docketanalyzer/services/elastic.py
def load_elastic(**kwargs):
    """Load an Elasticsearch client with the configured connection URL.

    Run `da configure elastic` to set the connection URL.
    """
    es = Elasticsearch(env.ELASTIC_URL, **kwargs)
    return es

load_psql()

Load a Database object using the connection url in your config.

Run da configure postgres to set your PostgreSQL connection URL.

Source code in docketanalyzer/services/psql.py
def load_psql() -> Database:
    """Load a Database object using the connection url in your config.

    Run `da configure postgres` to set your PostgreSQL connection URL.
    """
    return Database(env.POSTGRES_URL)

load_redis(**kwargs)

Load a Redis client with the configured connection URL.

Run da configure elastic to set the connection URL.

Source code in docketanalyzer/services/redis.py
def load_redis(**kwargs):
    """Load a Redis client with the configured connection URL.

    Run `da configure elastic` to set the connection URL.
    """
    redis = Redis.from_url(env.REDIS_URL)
    return redis

load_s3(data_dir=None)

Load the S3 service.

Parameters:

Name Type Description Default
data_dir Optional[Union[str, Path]]

Path to local data directory. If None, uses env.DATA_DIR.

None

Returns:

Name Type Description
S3 S3

An instance of the S3 class.

Source code in docketanalyzer/services/s3.py
def load_s3(data_dir: str | Path | None = None) -> S3:
    """Load the S3 service.

    Args:
        data_dir (Optional[Union[str, Path]]): Path to local data directory.
            If None, uses env.DATA_DIR.

    Returns:
        S3: An instance of the S3 class.
    """
    return S3(data_dir)

load_psql()

Load a Database object using the connection url in your config.

Run da configure postgres to set your PostgreSQL connection URL.

Source code in docketanalyzer/services/psql.py
def load_psql() -> Database:
    """Load a Database object using the connection url in your config.

    Run `da configure postgres` to set your PostgreSQL connection URL.
    """
    return Database(env.POSTGRES_URL)

load_redis(**kwargs)

Load a Redis client with the configured connection URL.

Run da configure elastic to set the connection URL.

Source code in docketanalyzer/services/redis.py
def load_redis(**kwargs):
    """Load a Redis client with the configured connection URL.

    Run `da configure elastic` to set the connection URL.
    """
    redis = Redis.from_url(env.REDIS_URL)
    return redis

load_s3(data_dir=None)

Load the S3 service.

Parameters:

Name Type Description Default
data_dir Optional[Union[str, Path]]

Path to local data directory. If None, uses env.DATA_DIR.

None

Returns:

Name Type Description
S3 S3

An instance of the S3 class.

Source code in docketanalyzer/services/s3.py
def load_s3(data_dir: str | Path | None = None) -> S3:
    """Load the S3 service.

    Args:
        data_dir (Optional[Union[str, Path]]): Path to local data directory.
            If None, uses env.DATA_DIR.

    Returns:
        S3: An instance of the S3 class.
    """
    return S3(data_dir)

Database

A PostgreSQL database manager that provides high-level database operations.

This class handles database connections, table management, model registration, and provides an interface for table operations with schemaless tables through the Tables class.

Source code in docketanalyzer/services/psql.py
class Database:
    """A PostgreSQL database manager that provides high-level database operations.

    This class handles database connections, table management, model registration,
        and provides an interface for table operations with schemaless tables through
        the Tables class.
    """

    def __init__(
        self,
        connection: str | None = None,
        registered_models: list[type[DatabaseModel]] | None = None,
    ) -> None:
        """Initialize the database manager.

        Args:
            connection (str, optional): PostgreSQL connection URL
            registered_models (list): List of model classes to register with
                the database
        """
        self.connection = connection or env.POSTGRES_URL
        self.db: PostgresqlExtDatabase | None = None
        self.connect()
        self.registered_models: dict[str, type[DatabaseModel]] = {}
        if registered_models is not None:
            for model in registered_models:
                self.register_model(model)
        self.t = Tables(self)
        self.cache: dict[str, Any] = {}

    def connect(self) -> None:
        """Establish connection to the PostgreSQL database using the connection URL."""
        url = urlparse(self.connection)
        self.db = PostgresqlExtDatabase(
            database=url.path[1:],
            user=url.username,
            password=url.password,
            host=url.hostname,
            port=url.port,
        )

    def status(self) -> bool:
        """Check if the database connection is working.

        Returns:
            bool: True if connection is successful, False otherwise
        """
        return self.db.connect()

    @property
    def meta(self) -> dict[str, dict[str, Any]]:
        """Get database metadata including table and column information.

        Returns:
            dict: Database metadata including table schemas and foreign keys
        """
        if "meta" not in self.cache:
            meta = {}
            introspector = Introspector.from_database(self.db)
            metadata = introspector.introspect()
            fks = metadata.foreign_keys
            for table_name, columns in metadata.columns.items():
                meta[table_name] = {
                    "name": metadata.model_names[table_name],
                    "columns": columns,
                }
                if table_name in fks:
                    meta[table_name]["foreign_keys"] = {
                        x.column: x for x in fks[table_name]
                    }
            self.cache["meta"] = meta
        return self.cache["meta"]

    def reload(self):
        """Reload the database metadata and registered models."""
        self.close()
        self.__init__(
            connection=self.connection,
            registered_models=list(self.registered_models.values()),
        )

    def register_model(self, model: type[DatabaseModel]) -> None:
        """Register a model class with the database manager.

        Args:
            model: Peewee model class to register
        """
        self.registered_models[model._meta.table_name] = model
        model.db_manager = self
        model._meta.database = self.db

    def load_table_class(self, name: str, new: bool = False) -> type[DatabaseModel]:
        """Dynamically create a model class for a database table.

        Args:
            name (str): Name of the table
            new (bool): Whether this is a new table being created

        Returns:
            type: A new DatabaseModel subclass representing the table

        Raises:
            KeyError: If table doesn't exist and new=False
        """
        if not new and name not in self.meta:
            raise KeyError(
                f"Table {name} does not exist. Use db.create_table to create it."
            )

        class Meta:
            database = self.db
            table_name = name

        attrs = {"Meta": Meta}

        if not new:
            table_meta = self.meta[name]
            for column_name, column in table_meta["columns"].items():
                keeps = [
                    "column_name",
                    "index",
                    "primary_key",
                    "unique",
                    "default",
                    "model",
                ]
                rename = {"nullable": "null"}
                column_args = {
                    k: v
                    for k, v in column.__dict__.items()
                    if k in keeps + list(rename.keys())
                }
                fk = table_meta["foreign_keys"].get(column_args["column_name"])
                if fk:
                    column_args["model"] = self.load_table_class(fk.dest_table)
                for k, v in rename.items():
                    column_args[v] = column_args.pop(k)
                attrs[column_name] = column.field_class(**column_args)
        table_class = type(name, (DatabaseModel,), attrs)
        table_class.db_manager = self
        return table_class

    def create_table(
        self, name_or_model: str | type[DatabaseModel], exists_ok: bool = True
    ) -> None:
        """Create a new table in the database.

        Args:
            name_or_model (Union[str, Type[DatabaseModel]]): Name of the table to
                create or model class
            exists_ok (bool): Whether to silently continue if table exists

        Raises:
            ValueError: If table exists and exists_ok=False
        """
        if isinstance(name_or_model, str):
            name = name_or_model
            if name in self.meta:
                if not exists_ok:
                    raise ValueError(f"Table {name} already exists.")
                return
            table = self.load_table_class(name, new=True)
        else:
            table = name_or_model
        self.db.create_tables([table])
        self.reload()

    def drop_table(self, name: str, confirm: bool = True):
        """Drop a table from the database.

        Args:
            name (str): Name of the table to drop
            confirm (bool): Whether to prompt for confirmation before dropping

        Raises:
            Exception: If confirmation is required and user does not confirm
        """
        if confirm:
            response = input(
                notabs(f"""
                Are you sure you want to drop table '{name}'?
                This will DELETE ALL TABLE DATA.

                Are you sure you want to proceed? (y/n):
            """)
            ).lower()
            if response != "y":
                raise Exception("Aborted")
        table = self.t[name]
        self.db.drop_tables([table])
        self.reload()

    def close(self) -> None:
        """Close the database connection."""
        self.db.close()

__init__(connection=None, registered_models=None)

Initialize the database manager.

Parameters:

Name Type Description Default
connection str

PostgreSQL connection URL

None
registered_models list

List of model classes to register with the database

None
Source code in docketanalyzer/services/psql.py
def __init__(
    self,
    connection: str | None = None,
    registered_models: list[type[DatabaseModel]] | None = None,
) -> None:
    """Initialize the database manager.

    Args:
        connection (str, optional): PostgreSQL connection URL
        registered_models (list): List of model classes to register with
            the database
    """
    self.connection = connection or env.POSTGRES_URL
    self.db: PostgresqlExtDatabase | None = None
    self.connect()
    self.registered_models: dict[str, type[DatabaseModel]] = {}
    if registered_models is not None:
        for model in registered_models:
            self.register_model(model)
    self.t = Tables(self)
    self.cache: dict[str, Any] = {}

connect()

Establish connection to the PostgreSQL database using the connection URL.

Source code in docketanalyzer/services/psql.py
def connect(self) -> None:
    """Establish connection to the PostgreSQL database using the connection URL."""
    url = urlparse(self.connection)
    self.db = PostgresqlExtDatabase(
        database=url.path[1:],
        user=url.username,
        password=url.password,
        host=url.hostname,
        port=url.port,
    )

create_table(name_or_model, exists_ok=True)

Create a new table in the database.

Parameters:

Name Type Description Default
name_or_model Union[str, Type[DatabaseModel]]

Name of the table to create or model class

required
exists_ok bool

Whether to silently continue if table exists

True

Raises:

Type Description
ValueError

If table exists and exists_ok=False

Source code in docketanalyzer/services/psql.py
def create_table(
    self, name_or_model: str | type[DatabaseModel], exists_ok: bool = True
) -> None:
    """Create a new table in the database.

    Args:
        name_or_model (Union[str, Type[DatabaseModel]]): Name of the table to
            create or model class
        exists_ok (bool): Whether to silently continue if table exists

    Raises:
        ValueError: If table exists and exists_ok=False
    """
    if isinstance(name_or_model, str):
        name = name_or_model
        if name in self.meta:
            if not exists_ok:
                raise ValueError(f"Table {name} already exists.")
            return
        table = self.load_table_class(name, new=True)
    else:
        table = name_or_model
    self.db.create_tables([table])
    self.reload()

register_model(model)

Register a model class with the database manager.

Parameters:

Name Type Description Default
model type[DatabaseModel]

Peewee model class to register

required
Source code in docketanalyzer/services/psql.py
def register_model(self, model: type[DatabaseModel]) -> None:
    """Register a model class with the database manager.

    Args:
        model: Peewee model class to register
    """
    self.registered_models[model._meta.table_name] = model
    model.db_manager = self
    model._meta.database = self.db

DatabaseModel

Bases: DatabaseModelQueryMixin, Model

A base model class that extends Peewee's Model with additional functionality.

This class provides enhanced database operations including pandas DataFrame conversion, batch processing, column management, and model reloading capabilities.

Source code in docketanalyzer/services/psql.py
class DatabaseModel(DatabaseModelQueryMixin, Model):
    """A base model class that extends Peewee's Model with additional functionality.

    This class provides enhanced database operations including pandas DataFrame
        conversion, batch processing, column management, and model reloading
        capabilities.
    """

    db_manager = None

    @classmethod
    def drop_column(cls, column_name: str, confirm: bool = True) -> None:
        """Drop a column from the database table.

        Args:
            column_name (str): Name of the column to drop
            confirm (bool): Whether to prompt for confirmation before dropping
        """
        table_name = cls._meta.table_name
        if confirm:
            response = input(
                notabs(f"""
                Are you sure you want to drop '{column_name}' from '{table_name}'?
                This will DELETE ALL COLUMN DATA.

                Are you sure you want to proceed? (y/n):
            """)
            ).lower()
            if response != "y":
                raise Exception("Aborted")
        migrator = PostgresqlMigrator(cls._meta.database)
        migrate(migrator.drop_column(table_name, column_name))
        cls.reload()

    @classmethod
    def add_column(
        cls,
        column_name: str,
        column_type: str,
        null: bool = True,
        overwrite: bool = False,
        exists_ok: bool = True,
        **kwargs: Any,
    ) -> None:
        """Add a new column to the database table.

        Args:
            column_name (str): Name of the new column
            column_type (str): Peewee field type for the column
            null (bool, optional): Whether the column can contain NULL values
            overwrite (bool): Whether to overwrite if column exists
            exists_ok (bool): Whether to silently continue if column exists
            **kwargs: Additional field parameters passed to Peewee
        """
        table_name = cls._meta.table_name
        table_meta = cls.db_manager.meta[table_name]
        migrator = PostgresqlMigrator(cls._meta.database)
        if column_name in table_meta["columns"]:
            if not exists_ok:
                raise ValueError(
                    f"Column {column_name} already exists in table {table_name}."
                )
            if not overwrite:
                return
            cls.drop_column(column_name)

        kwargs["null"] = null
        migrate(
            migrator.add_column(
                table_name, column_name, getattr(peewee, column_type)(**kwargs)
            )
        )
        cls.reload()

    @classmethod
    def add_data(
        cls, data: pd.DataFrame, copy: bool = False, batch_size: int = 1000
    ) -> None:
        """Add data to the table from a pandas DataFrame.

        Args:
            data (pd.DataFrame): DataFrame containing the data to insert
            copy (bool): Whether to use Postgres COPY command for faster insertion
            batch_size (int): Number of records to insert in each batch
                when not using COPY
        """
        if copy:
            conn = cls._meta.database.connection()
            with conn.cursor() as cursor:
                buffer = StringIO()
                csv_writer = csv.writer(
                    buffer, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
                )
                for _, row in data.iterrows():
                    csv_writer.writerow(
                        [
                            "\\N" if pd.isna(value) or value == "" else str(value)
                            for value in row
                        ]
                    )
                buffer.seek(0)

                cols = ",".join(data.columns)
                table_name = cls._meta.table_name
                cursor.copy_expert(
                    f"COPY {table_name} ({cols}) FROM STDIN WITH CSV NULL AS '\\N'",
                    buffer,
                )
        else:
            data = data.to_dict(orient="records")
            with cls._meta.database.atomic():
                for i in range(0, len(data), batch_size):
                    batch = data[i : i + batch_size]
                    cls.insert_many(batch).execute()

    @classmethod
    def reload(cls):
        """Reload the model class to reflect any changes in the database schema."""
        cls.db_manager.reload()
        new_table = cls.db_manager.load_table_class(cls._meta.table_name)
        new_attrs = dir(new_table)
        attrs = dir(cls)
        for attr in attrs:
            if attr not in new_attrs:
                delattr(cls, attr)
        for attr in new_attrs:
            if not attr.startswith("__"):
                setattr(cls, attr, getattr(new_table, attr))

add_column(column_name, column_type, null=True, overwrite=False, exists_ok=True, **kwargs) classmethod

Add a new column to the database table.

Parameters:

Name Type Description Default
column_name str

Name of the new column

required
column_type str

Peewee field type for the column

required
null bool

Whether the column can contain NULL values

True
overwrite bool

Whether to overwrite if column exists

False
exists_ok bool

Whether to silently continue if column exists

True
**kwargs Any

Additional field parameters passed to Peewee

{}
Source code in docketanalyzer/services/psql.py
@classmethod
def add_column(
    cls,
    column_name: str,
    column_type: str,
    null: bool = True,
    overwrite: bool = False,
    exists_ok: bool = True,
    **kwargs: Any,
) -> None:
    """Add a new column to the database table.

    Args:
        column_name (str): Name of the new column
        column_type (str): Peewee field type for the column
        null (bool, optional): Whether the column can contain NULL values
        overwrite (bool): Whether to overwrite if column exists
        exists_ok (bool): Whether to silently continue if column exists
        **kwargs: Additional field parameters passed to Peewee
    """
    table_name = cls._meta.table_name
    table_meta = cls.db_manager.meta[table_name]
    migrator = PostgresqlMigrator(cls._meta.database)
    if column_name in table_meta["columns"]:
        if not exists_ok:
            raise ValueError(
                f"Column {column_name} already exists in table {table_name}."
            )
        if not overwrite:
            return
        cls.drop_column(column_name)

    kwargs["null"] = null
    migrate(
        migrator.add_column(
            table_name, column_name, getattr(peewee, column_type)(**kwargs)
        )
    )
    cls.reload()

add_data(data, copy=False, batch_size=1000) classmethod

Add data to the table from a pandas DataFrame.

Parameters:

Name Type Description Default
data DataFrame

DataFrame containing the data to insert

required
copy bool

Whether to use Postgres COPY command for faster insertion

False
batch_size int

Number of records to insert in each batch when not using COPY

1000
Source code in docketanalyzer/services/psql.py
@classmethod
def add_data(
    cls, data: pd.DataFrame, copy: bool = False, batch_size: int = 1000
) -> None:
    """Add data to the table from a pandas DataFrame.

    Args:
        data (pd.DataFrame): DataFrame containing the data to insert
        copy (bool): Whether to use Postgres COPY command for faster insertion
        batch_size (int): Number of records to insert in each batch
            when not using COPY
    """
    if copy:
        conn = cls._meta.database.connection()
        with conn.cursor() as cursor:
            buffer = StringIO()
            csv_writer = csv.writer(
                buffer, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
            )
            for _, row in data.iterrows():
                csv_writer.writerow(
                    [
                        "\\N" if pd.isna(value) or value == "" else str(value)
                        for value in row
                    ]
                )
            buffer.seek(0)

            cols = ",".join(data.columns)
            table_name = cls._meta.table_name
            cursor.copy_expert(
                f"COPY {table_name} ({cols}) FROM STDIN WITH CSV NULL AS '\\N'",
                buffer,
            )
    else:
        data = data.to_dict(orient="records")
        with cls._meta.database.atomic():
            for i in range(0, len(data), batch_size):
                batch = data[i : i + batch_size]
                cls.insert_many(batch).execute()

drop_column(column_name, confirm=True) classmethod

Drop a column from the database table.

Parameters:

Name Type Description Default
column_name str

Name of the column to drop

required
confirm bool

Whether to prompt for confirmation before dropping

True
Source code in docketanalyzer/services/psql.py
@classmethod
def drop_column(cls, column_name: str, confirm: bool = True) -> None:
    """Drop a column from the database table.

    Args:
        column_name (str): Name of the column to drop
        confirm (bool): Whether to prompt for confirmation before dropping
    """
    table_name = cls._meta.table_name
    if confirm:
        response = input(
            notabs(f"""
            Are you sure you want to drop '{column_name}' from '{table_name}'?
            This will DELETE ALL COLUMN DATA.

            Are you sure you want to proceed? (y/n):
        """)
        ).lower()
        if response != "y":
            raise Exception("Aborted")
    migrator = PostgresqlMigrator(cls._meta.database)
    migrate(migrator.drop_column(table_name, column_name))
    cls.reload()

reload() classmethod

Reload the model class to reflect any changes in the database schema.

Source code in docketanalyzer/services/psql.py
@classmethod
def reload(cls):
    """Reload the model class to reflect any changes in the database schema."""
    cls.db_manager.reload()
    new_table = cls.db_manager.load_table_class(cls._meta.table_name)
    new_attrs = dir(new_table)
    attrs = dir(cls)
    for attr in attrs:
        if attr not in new_attrs:
            delattr(cls, attr)
    for attr in new_attrs:
        if not attr.startswith("__"):
            setattr(cls, attr, getattr(new_table, attr))

S3

A class for syncing local data with an S3 bucket.

Attributes:

Name Type Description
data_dir Path

Local directory for data storage.

bucket Path

S3 bucket name.

endpoint_url Optional[str]

Custom S3 endpoint URL.

client client

Boto3 S3 client for direct API interactions.

Source code in docketanalyzer/services/s3.py
class S3:
    """A class for syncing local data with an S3 bucket.

    Attributes:
        data_dir (Path): Local directory for data storage.
        bucket (Path): S3 bucket name.
        endpoint_url (Optional[str]): Custom S3 endpoint URL.
        client (boto3.client): Boto3 S3 client for direct API interactions.
    """

    def __init__(self, data_dir: str | None = None) -> None:
        """Initialize the S3 service.

        Args:
            data_dir (Optional[str]): Path to local data directory.
                If None, uses env.DATA_DIR.
        """
        export_env()
        self.data_dir = Path(data_dir or env.DATA_DIR)
        self.bucket = env.AWS_S3_BUCKET_NAME
        self.endpoint_url = env.AWS_S3_ENDPOINT_URL
        self.client = boto3.client(
            "s3",
            endpoint_url=self.endpoint_url,
            aws_access_key_id=env.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=env.AWS_SECRET_ACCESS_KEY,
            config=Config(signature_version="s3v4"),
        )

    def _sync(
        self,
        from_path: str | Path,
        to_path: str | Path,
        confirm: bool = False,
        exclude_hidden: bool = True,
        exact_timestamps: bool = True,
        **kwargs: Any,
    ) -> None:
        """Execute an AWS S3 sync command between two paths.

        This is a private helper method that constructs and executes an AWS CLI command
        for syncing files between local and S3 storage.

        Args:
            from_path (Union[str, Path]): Source path to sync from.
            to_path (Union[str, Path]): Destination path to sync to.
            confirm (bool): If True, asks for confirmation before executing the command.
            exclude_hidden (bool): If True, excludes hidden files and directories.
            exact_timestamps (bool): If True, compares timestamps.
            **kwargs: Additional arguments to pass to the AWS CLI s3 sync command.
        """
        cmd = f"aws s3 sync {from_path} {to_path}"

        if self.endpoint_url is not None:
            cmd += f" --endpoint-url {self.endpoint_url}"

        if exclude_hidden:
            cmd += ' --exclude "*/.*" --exclude ".*"'

        kwargs["exact_timestamps"] = exact_timestamps
        kwargs = {k: v for k, v in kwargs.items() if v is not None}

        for k, v in kwargs.items():
            k = k.replace("_", "-")
            if isinstance(v, bool):
                if v:
                    cmd += f" --{k}"
            elif isinstance(v, list | tuple):
                for item in v:
                    cmd += f' --{k} "{item}"'
            else:
                cmd += f' --{k} "{v}"'

        if confirm:
            print("Run the following command?")
            print(cmd)
            response = input("y/n: ")
            if response == "y":
                os.system(cmd)
        else:
            os.system(cmd)

    def _prepare_paths(
        self,
        path: str | Path | None,
        from_path: str | Path | None,
        to_path: str | Path | None,
    ) -> tuple[Path, Path]:
        """Prepare source and destination paths for sync operations.

        This method handles path normalization and ensures paths are properly
        formatted for sync operations.

        Args:
            path (Optional[Union[str, Path]]): If provided, used as both
                from_path and to_path.
            from_path (Optional[Union[str, Path]]): Source path for sync operation.
            to_path (Optional[Union[str, Path]]): Destination path for sync operation.

        Returns:
            Tuple[Path, Path]: Normalized from_path and to_path.
        """
        if path is not None:
            path = Path(path)

            with suppress(ValueError):
                path = path.relative_to(self.data_dir)
            from_path = to_path = path

        if path is None and from_path is None and to_path is None:
            raise ValueError("Must provide at least one path argument")

        from_path = Path() if from_path is None else Path(from_path)
        to_path = Path() if to_path is None else Path(to_path)

        return from_path, to_path

    def push(
        self,
        path: str | Path | None = None,
        from_path: str | Path | None = None,
        to_path: str | Path | None = None,
        **kwargs: Any,
    ) -> None:
        """Push data from local storage to S3.

        Syncs files from a local directory to an S3 bucket path.

        Args:
            path (Optional[Union[str, Path]]): If provided, used as both
                from_path and to_path.
            from_path (Optional[Union[str, Path]]): Local source path to sync from.
            to_path (Optional[Union[str, Path]]): S3 destination path to sync to.
            **kwargs: Additional arguments to pass to the AWS CLI s3 sync command.
        """
        from_path, to_path = self._prepare_paths(path, from_path, to_path)
        if self.data_dir is not None:
            from_path = self.data_dir / from_path
        to_path = f"s3://{Path(self.bucket) / to_path}"
        self._sync(from_path, to_path, **kwargs)

    def pull(
        self,
        path: str | Path | None = None,
        from_path: str | Path | None = None,
        to_path: str | Path | None = None,
        **kwargs: Any,
    ) -> None:
        """Pull data from S3 to local storage.

        Syncs files from an S3 bucket path to a local directory.

        Args:
            path (Optional[Union[str, Path]]): If provided, used as both
                from_path and to_path.
            from_path (Optional[Union[str, Path]]): S3 source path to sync from.
            to_path (Optional[Union[str, Path]]): Local destination path to sync to.
            **kwargs: Additional arguments to pass to the AWS CLI s3 sync command.
        """
        from_path, to_path = self._prepare_paths(path, from_path, to_path)
        if self.data_dir is not None:
            to_path = self.data_dir / to_path
        from_path = f"s3://{Path(self.bucket) / from_path}"
        self._sync(from_path, to_path, **kwargs)

    def download(self, s3_key: str, local_path: str | Path | None = None) -> Path:
        """Download a single file from S3 using the boto3 client.

        This method downloads a specific file from S3 to a local path.
        If local_path is not provided, it will mirror the S3 path structure
        in the data directory.

        Args:
            s3_key (str): The key of the file in the S3 bucket.
            local_path (Optional[Union[str, Path]]): The local path to save the file to.
                If None, the file will be saved to data_dir/s3_key.

        Returns:
            Path: The path to the downloaded file.

        Raises:
            botocore.exceptions.ClientError: If the download fails.
        """
        local_path = self.data_dir / s3_key if local_path is None else Path(local_path)
        local_path.parent.mkdir(parents=True, exist_ok=True)

        self.client.download_file(
            Bucket=self.bucket, Key=s3_key, Filename=str(local_path)
        )

        return local_path

    def upload(self, local_path: str | Path, s3_key: str | None = None) -> str:
        """Upload a single file to S3 using the boto3 client.

        This method uploads a specific file from a local path to S3.
        If s3_key is not provided, it will use the relative path from data_dir
        as the S3 key.

        Args:
            local_path (Union[str, Path]): The local path of the file to upload.
            s3_key (Optional[str]): The key to use in the S3 bucket.
                If None, the relative path from data_dir will be used.

        Returns:
            str: The S3 key of the uploaded file.

        Raises:
            FileNotFoundError: If the local file does not exist.
            botocore.exceptions.ClientError: If the upload fails.
        """
        local_path = Path(local_path)

        if not local_path.exists():
            raise FileNotFoundError(f"File not found: {local_path}")

        if s3_key is None:
            try:
                s3_key = str(local_path.relative_to(self.data_dir))
            except ValueError:
                s3_key = local_path.name

        self.client.upload_file(
            Filename=str(local_path), Bucket=self.bucket, Key=s3_key
        )

        return s3_key

    def delete(self, s3_key: str) -> None:
        """Delete a single file from S3 using the boto3 client.

        Args:
            s3_key (str): The key of the file in the S3 bucket to delete.

        Raises:
            botocore.exceptions.ClientError: If the deletion fails.
        """
        self.client.delete_object(Bucket=self.bucket, Key=s3_key)

    def status(self) -> bool:
        """Check if S3 connection is working."""
        try:
            self.client.list_buckets()
            return True
        except Exception:
            return False

__init__(data_dir=None)

Initialize the S3 service.

Parameters:

Name Type Description Default
data_dir Optional[str]

Path to local data directory. If None, uses env.DATA_DIR.

None
Source code in docketanalyzer/services/s3.py
def __init__(self, data_dir: str | None = None) -> None:
    """Initialize the S3 service.

    Args:
        data_dir (Optional[str]): Path to local data directory.
            If None, uses env.DATA_DIR.
    """
    export_env()
    self.data_dir = Path(data_dir or env.DATA_DIR)
    self.bucket = env.AWS_S3_BUCKET_NAME
    self.endpoint_url = env.AWS_S3_ENDPOINT_URL
    self.client = boto3.client(
        "s3",
        endpoint_url=self.endpoint_url,
        aws_access_key_id=env.AWS_ACCESS_KEY_ID,
        aws_secret_access_key=env.AWS_SECRET_ACCESS_KEY,
        config=Config(signature_version="s3v4"),
    )

push(path=None, from_path=None, to_path=None, **kwargs)

Push data from local storage to S3.

Syncs files from a local directory to an S3 bucket path.

Parameters:

Name Type Description Default
path Optional[Union[str, Path]]

If provided, used as both from_path and to_path.

None
from_path Optional[Union[str, Path]]

Local source path to sync from.

None
to_path Optional[Union[str, Path]]

S3 destination path to sync to.

None
**kwargs Any

Additional arguments to pass to the AWS CLI s3 sync command.

{}
Source code in docketanalyzer/services/s3.py
def push(
    self,
    path: str | Path | None = None,
    from_path: str | Path | None = None,
    to_path: str | Path | None = None,
    **kwargs: Any,
) -> None:
    """Push data from local storage to S3.

    Syncs files from a local directory to an S3 bucket path.

    Args:
        path (Optional[Union[str, Path]]): If provided, used as both
            from_path and to_path.
        from_path (Optional[Union[str, Path]]): Local source path to sync from.
        to_path (Optional[Union[str, Path]]): S3 destination path to sync to.
        **kwargs: Additional arguments to pass to the AWS CLI s3 sync command.
    """
    from_path, to_path = self._prepare_paths(path, from_path, to_path)
    if self.data_dir is not None:
        from_path = self.data_dir / from_path
    to_path = f"s3://{Path(self.bucket) / to_path}"
    self._sync(from_path, to_path, **kwargs)

pull(path=None, from_path=None, to_path=None, **kwargs)

Pull data from S3 to local storage.

Syncs files from an S3 bucket path to a local directory.

Parameters:

Name Type Description Default
path Optional[Union[str, Path]]

If provided, used as both from_path and to_path.

None
from_path Optional[Union[str, Path]]

S3 source path to sync from.

None
to_path Optional[Union[str, Path]]

Local destination path to sync to.

None
**kwargs Any

Additional arguments to pass to the AWS CLI s3 sync command.

{}
Source code in docketanalyzer/services/s3.py
def pull(
    self,
    path: str | Path | None = None,
    from_path: str | Path | None = None,
    to_path: str | Path | None = None,
    **kwargs: Any,
) -> None:
    """Pull data from S3 to local storage.

    Syncs files from an S3 bucket path to a local directory.

    Args:
        path (Optional[Union[str, Path]]): If provided, used as both
            from_path and to_path.
        from_path (Optional[Union[str, Path]]): S3 source path to sync from.
        to_path (Optional[Union[str, Path]]): Local destination path to sync to.
        **kwargs: Additional arguments to pass to the AWS CLI s3 sync command.
    """
    from_path, to_path = self._prepare_paths(path, from_path, to_path)
    if self.data_dir is not None:
        to_path = self.data_dir / to_path
    from_path = f"s3://{Path(self.bucket) / from_path}"
    self._sync(from_path, to_path, **kwargs)

upload(local_path, s3_key=None)

Upload a single file to S3 using the boto3 client.

This method uploads a specific file from a local path to S3. If s3_key is not provided, it will use the relative path from data_dir as the S3 key.

Parameters:

Name Type Description Default
local_path Union[str, Path]

The local path of the file to upload.

required
s3_key Optional[str]

The key to use in the S3 bucket. If None, the relative path from data_dir will be used.

None

Returns:

Name Type Description
str str

The S3 key of the uploaded file.

Raises:

Type Description
FileNotFoundError

If the local file does not exist.

ClientError

If the upload fails.

Source code in docketanalyzer/services/s3.py
def upload(self, local_path: str | Path, s3_key: str | None = None) -> str:
    """Upload a single file to S3 using the boto3 client.

    This method uploads a specific file from a local path to S3.
    If s3_key is not provided, it will use the relative path from data_dir
    as the S3 key.

    Args:
        local_path (Union[str, Path]): The local path of the file to upload.
        s3_key (Optional[str]): The key to use in the S3 bucket.
            If None, the relative path from data_dir will be used.

    Returns:
        str: The S3 key of the uploaded file.

    Raises:
        FileNotFoundError: If the local file does not exist.
        botocore.exceptions.ClientError: If the upload fails.
    """
    local_path = Path(local_path)

    if not local_path.exists():
        raise FileNotFoundError(f"File not found: {local_path}")

    if s3_key is None:
        try:
            s3_key = str(local_path.relative_to(self.data_dir))
        except ValueError:
            s3_key = local_path.name

    self.client.upload_file(
        Filename=str(local_path), Bucket=self.bucket, Key=s3_key
    )

    return s3_key

download(s3_key, local_path=None)

Download a single file from S3 using the boto3 client.

This method downloads a specific file from S3 to a local path. If local_path is not provided, it will mirror the S3 path structure in the data directory.

Parameters:

Name Type Description Default
s3_key str

The key of the file in the S3 bucket.

required
local_path Optional[Union[str, Path]]

The local path to save the file to. If None, the file will be saved to data_dir/s3_key.

None

Returns:

Name Type Description
Path Path

The path to the downloaded file.

Raises:

Type Description
ClientError

If the download fails.

Source code in docketanalyzer/services/s3.py
def download(self, s3_key: str, local_path: str | Path | None = None) -> Path:
    """Download a single file from S3 using the boto3 client.

    This method downloads a specific file from S3 to a local path.
    If local_path is not provided, it will mirror the S3 path structure
    in the data directory.

    Args:
        s3_key (str): The key of the file in the S3 bucket.
        local_path (Optional[Union[str, Path]]): The local path to save the file to.
            If None, the file will be saved to data_dir/s3_key.

    Returns:
        Path: The path to the downloaded file.

    Raises:
        botocore.exceptions.ClientError: If the download fails.
    """
    local_path = self.data_dir / s3_key if local_path is None else Path(local_path)
    local_path.parent.mkdir(parents=True, exist_ok=True)

    self.client.download_file(
        Bucket=self.bucket, Key=s3_key, Filename=str(local_path)
    )

    return local_path

delete(s3_key)

Delete a single file from S3 using the boto3 client.

Parameters:

Name Type Description Default
s3_key str

The key of the file in the S3 bucket to delete.

required

Raises:

Type Description
ClientError

If the deletion fails.

Source code in docketanalyzer/services/s3.py
def delete(self, s3_key: str) -> None:
    """Delete a single file from S3 using the boto3 client.

    Args:
        s3_key (str): The key of the file in the S3 bucket to delete.

    Raises:
        botocore.exceptions.ClientError: If the deletion fails.
    """
    self.client.delete_object(Bucket=self.bucket, Key=s3_key)

status()

Check if S3 connection is working.

Source code in docketanalyzer/services/s3.py
def status(self) -> bool:
    """Check if S3 connection is working."""
    try:
        self.client.list_buckets()
        return True
    except Exception:
        return False

utils

extension_required

Context manager extension imports.

Source code in docketanalyzer/utils/utils.py
class extension_required:
    """Context manager extension imports."""

    def __init__(self, extension: str):
        """Initialize context manager."""
        self.extension = extension

    def __enter__(self):
        """Enter the context."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Handle import errors with helpful messages."""
        if exc_type is not None and issubclass(exc_type, ImportError):
            raise ImportError(
                f"\n\n{self.extension} extension not installed. "
                f"Use `pip install 'docketanalyzer[{self.extension}]'` to install."
            ) from exc_val

        return False

__init__(extension)

Initialize context manager.

Source code in docketanalyzer/utils/utils.py
def __init__(self, extension: str):
    """Initialize context manager."""
    self.extension = extension

__enter__()

Enter the context.

Source code in docketanalyzer/utils/utils.py
def __enter__(self):
    """Enter the context."""
    return self

__exit__(exc_type, exc_val, exc_tb)

Handle import errors with helpful messages.

Source code in docketanalyzer/utils/utils.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Handle import errors with helpful messages."""
    if exc_type is not None and issubclass(exc_type, ImportError):
        raise ImportError(
            f"\n\n{self.extension} extension not installed. "
            f"Use `pip install 'docketanalyzer[{self.extension}]'` to install."
        ) from exc_val

    return False

timeit

Context manager for timing things.

Usage: with timeit("Task"): # do something do_something()

This will print the time taken to execute the block of code.

Source code in docketanalyzer/utils/utils.py
class timeit:
    """Context manager for timing things.

    Usage:
    with timeit("Task"):
        # do something
        do_something()

    This will print the time taken to execute the block of code.
    """

    def __init__(self, description: str = "Task"):
        """Initialize the timeit context manager with a description."""
        self.description = description
        self.start = None

    def __enter__(self):
        """Start the timer."""
        self.start = datetime.now()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Print the execution time."""
        end = datetime.now()
        execution_time = (end - self.start).total_seconds()
        print(f"{self.description} took {execution_time:.4f} seconds")

__init__(description='Task')

Initialize the timeit context manager with a description.

Source code in docketanalyzer/utils/utils.py
def __init__(self, description: str = "Task"):
    """Initialize the timeit context manager with a description."""
    self.description = description
    self.start = None

__enter__()

Start the timer.

Source code in docketanalyzer/utils/utils.py
def __enter__(self):
    """Start the timer."""
    self.start = datetime.now()
    return self

__exit__(exc_type, exc_val, exc_tb)

Print the execution time.

Source code in docketanalyzer/utils/utils.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Print the execution time."""
    end = datetime.now()
    execution_time = (end - self.start).total_seconds()
    print(f"{self.description} took {execution_time:.4f} seconds")

parse_docket_id(docket_id)

Parse a docket ID into a court and docket number.

Source code in docketanalyzer/utils/utils.py
def parse_docket_id(docket_id: str) -> tuple[str, str]:
    """Parse a docket ID into a court and docket number."""
    court, docket_number = docket_id.split("__")
    docket_number = docket_number.replace("_", ":")
    return court, docket_number

construct_docket_id(court, docket_number)

Construct a docket ID from a court and docket number.

Source code in docketanalyzer/utils/utils.py
def construct_docket_id(court: str, docket_number: str) -> str:
    """Construct a docket ID from a court and docket number."""
    formatted_number = docket_number.replace(":", "_")
    return f"{court}__{formatted_number}"

json_default(obj)

Default JSON serializer for datetime and date objects.

Source code in docketanalyzer/utils/utils.py
def json_default(obj: Any) -> Any:
    """Default JSON serializer for datetime and date objects."""
    if isinstance(obj, datetime | date):
        return obj.isoformat()
    raise TypeError(f"Type {type(obj)} not serializable")

notabs(text)

Remove leading/trailing whitespace on each line.

Source code in docketanalyzer/utils/utils.py
def notabs(text: str) -> str:
    """Remove leading/trailing whitespace on each line."""
    return "\n".join([x.strip() for x in text.split("\n")]).strip()

download_file(url, path, description='Downloading')

Download file from URL to local path with progress bar.

Source code in docketanalyzer/utils/utils.py
def download_file(url: str, path: str | Path, description: str = "Downloading"):
    """Download file from URL to local path with progress bar."""
    path = Path(path)
    response = requests.get(url, stream=True)
    response.raise_for_status()
    total_size = int(response.headers.get("content-length", 0))

    with (
        path.open("wb") as file,
        tqdm(
            desc=description,
            total=total_size,
            unit="iB",
            unit_scale=True,
            unit_divisor=1024,
        ) as progress,
    ):
        for data in response.iter_content(chunk_size=1024):
            size = file.write(data)
            progress.update(size)

generate_hash(data, salt=None, length=None)

Generate a hash for some data with optional salt.

Source code in docketanalyzer/utils/utils.py
def generate_hash(data: Any, salt: str | None = None, length: int | None = None) -> str:
    """Generate a hash for some data with optional salt."""
    data = json.dumps({"data": data}, sort_keys=True, default=json_default)
    if salt:
        data += salt
    hash = hashlib.sha256(data.encode()).hexdigest()
    if length:
        hash = hash[:length]
    return hash

generate_code(length=16)

Generate a random code of specified length.

Source code in docketanalyzer/utils/utils.py
def generate_code(length: int = 16) -> str:
    """Generate a random code of specified length."""
    abc = string.ascii_uppercase + string.digits
    return "".join(secrets.choice(abc) for _ in range(length))

pd_save_or_append(data, path, **kwargs)

Save or append a DataFrame to a CSV file.

Source code in docketanalyzer/utils/utils.py
def pd_save_or_append(data: pd.DataFrame, path: str | Path, **kwargs):
    """Save or append a DataFrame to a CSV file."""
    path = Path(path)
    if path.exists():
        data.to_csv(path, mode="a", header=False, index=False, **kwargs)
    else:
        data.to_csv(path, index=False, **kwargs)

datetime_utcnow()

Get the current UTC datetime.

Source code in docketanalyzer/utils/utils.py
def datetime_utcnow() -> datetime:
    """Get the current UTC datetime."""
    from datetime import UTC

    return datetime.now(UTC)

list_to_array(data)

Convert a list of lists to a numpy array of float32.

Source code in docketanalyzer/utils/utils.py
def list_to_array(data: list[list[float | int]]) -> np.ndarray:
    """Convert a list of lists to a numpy array of float32."""
    return np.array([np.array(x) for x in data]).astype("float32")

to_date(value)

Convert a value to a date if possible.

Source code in docketanalyzer/utils/utils.py
def to_date(value: Any) -> date | None:
    """Convert a value to a date if possible."""
    if value:
        with suppress(ValueError, TypeError, ParserError, OutOfBoundsDatetime):
            return pd.to_datetime(value).date()

to_int(value)

Convert a value to an integer if possible.

Source code in docketanalyzer/utils/utils.py
def to_int(value: Any) -> int | None:
    """Convert a value to an integer if possible."""
    if value is not None:
        with suppress(ValueError):
            return int(value)