from __future__ import annotations
from collections.abc import Mapping
from datetime import datetime
from pathlib import Path
from typing import Any, List
import httpx
from astropy.table import Row, Table
from .conf import conf
from .exceptions import AuthenticationError, Grad300APIError, Grad300Error
from .models import ScanRecord, ScanType
from .scans import DownloadResult, ScansService
[docs]
class Grad300Client:
"""Python client for the GRAD300 backend API.
Wraps the ``/api/v1`` REST endpoints and returns results as
:class:`astropy.table.Table` (for listing/querying) or
:class:`astropy.io.fits.HDUList` / :class:`pathlib.Path` (for downloads).
Authentication is handled automatically: supply either a *token* or
a *user* + *password* pair and the client acquires a Bearer token on
your behalf.
Examples
--------
>>> from grad300_client import Grad300
>>> with Grad300(user="me@example.com", password="secret") as client:
... table = client.query_source("Sun")
"""
[docs]
def __init__(
self,
base_url: str | None = None,
*,
api_prefix: str | None = None,
timeout: float | None = None,
token: str | None = None,
user: str | None = None,
password: str | None = None,
transport: httpx.BaseTransport | None = None,
) -> None:
"""Initialise the client and optionally authenticate.
Parameters
----------
base_url : str | None, optional
Root URL of the GRAD300 server. Defaults to
:attr:`~grad300_client.conf.base_url`.
api_prefix : str | None, optional
Path prefix for all API calls (for example ``/api/v1``).
Defaults to :attr:`~grad300_client.conf.api_prefix`.
timeout : float | None, optional
HTTP timeout in seconds. Defaults to
:attr:`~grad300_client.conf.timeout`.
token : str | None, optional
Pre-obtained Bearer token. Mutually exclusive with ``user`` and
``password``.
user : str | None, optional
E-mail address used for username/password login.
password : str | None, optional
Password for username/password login.
transport : httpx.BaseTransport | None, optional
Custom HTTP transport, for example ``httpx.MockTransport`` in tests.
Raises
------
ValueError
If both token and user/password are provided, or if only one of
user/password is provided.
"""
if token and (user is not None or password is not None):
raise ValueError("Use either 'token' or both 'user'/'password', not both.")
if (user is None) != (password is None):
raise ValueError("Both 'user' and 'password' must be provided together.")
resolved_base_url = base_url or str(conf.base_url)
resolved_api_prefix = api_prefix or str(conf.api_prefix)
resolved_timeout = float(conf.timeout) if timeout is None else timeout
self._default_page_size = int(conf.default_page_size)
self._base_api_url = _build_api_base_url(
base_url=resolved_base_url,
api_prefix=resolved_api_prefix,
)
self._client = httpx.Client(
base_url=self._base_api_url,
timeout=resolved_timeout,
transport=transport,
headers={"Accept": "application/json"},
)
self._scans = ScansService(self)
if token:
self.set_token(token)
elif user is not None and password is not None:
self.login(user, password)
@property
def base_api_url(self) -> str:
"""Return the fully normalized API base URL.
Returns
-------
str
Base URL used by the underlying HTTP client.
"""
return self._base_api_url
[docs]
def set_token(self, token: str) -> None:
"""Set the Bearer token used for authenticated requests.
Parameters
----------
token : str
A valid Bearer token.
"""
self._client.headers["Authorization"] = f"Bearer {token}"
[docs]
def clear_token(self) -> None:
"""Remove the current Bearer token from the HTTP headers."""
self._client.headers.pop("Authorization", None)
[docs]
def login(self, email: str, password: str) -> str:
"""Authenticate with username and password.
Acquires a Bearer token from ``/login/access-token`` and stores it
automatically. Subsequent requests are authenticated.
Parameters
----------
email : str
User e-mail address.
password : str
User password.
Returns
-------
str
Raw Bearer token.
Raises
------
grad300_client.Grad300Error
If the response does not include ``access_token``.
grad300_client.AuthenticationError
If credentials are rejected by the server.
"""
response = self._request(
"POST",
"/login/access-token",
data={"username": email, "password": password},
)
payload = response.json()
token = payload.get("access_token")
if not isinstance(token, str) or not token:
raise Grad300Error("Login response did not include an access token")
self.set_token(token)
return token
[docs]
def test_token(self) -> dict[str, Any]:
"""Verify that the current token is valid.
Returns
-------
dict[str, Any]
JSON payload returned by ``/login/test-token``.
Raises
------
grad300_client.AuthenticationError
If the token is missing or rejected.
"""
payload = self.get_json("/login/test-token")
if not isinstance(payload, dict):
raise ValueError("Unexpected payload from /login/test-token")
return payload
[docs]
def list(
self,
scan_type: ScanType | str,
*,
offset: int = 0,
limit: int | None = None,
source: str | None = None,
scan_name: str | None = None,
date_from: datetime | None = None,
date_to: datetime | None = None,
) -> Table:
"""Return a single page of scans as an :class:`astropy.table.Table`.
Parameters
----------
scan_type : ScanType | str
Scan type to list (``"TPI"``, ``"Images"``, ``"Spectrum"``, or
``"OnOff"``).
offset : int, default=0
Number of records to skip.
limit : int | None, optional
Maximum number of records to return. Defaults to
:attr:`~grad300_client.conf.default_page_size`.
source : str | None, optional
Filter by source name.
scan_name : str | None, optional
Filter by partial filename.
date_from : datetime | None, optional
Lower bound for observation date (inclusive).
date_to : datetime | None, optional
Upper bound for observation date (inclusive).
Returns
-------
astropy.table.Table
One row per scan.
"""
resolved_limit = self._resolve_page_size(limit, field_name="limit")
return self._scans.list(
scan_type,
offset=offset,
limit=resolved_limit,
source=source,
scan_name=scan_name,
date_from=date_from,
date_to=date_to,
)
[docs]
def list_records(
self,
scan_type: ScanType | str,
*,
offset: int = 0,
limit: int | None = None,
source: str | None = None,
scan_name: str | None = None,
date_from: datetime | None = None,
date_to: datetime | None = None,
) -> List[ScanRecord]:
"""Like :meth:`list` but return structured Python records.
Parameters
----------
scan_type : ScanType | str
Scan type to list.
offset : int, default=0
Number of records to skip.
limit : int | None, optional
Maximum number of records to return.
source : str | None, optional
Filter by source name.
scan_name : str | None, optional
Filter by partial filename.
date_from : datetime | None, optional
Lower bound for observation date.
date_to : datetime | None, optional
Upper bound for observation date.
Returns
-------
list[ScanRecord]
Structured scan records.
"""
resolved_limit = self._resolve_page_size(limit, field_name="limit")
return self._scans.list_records(
scan_type,
offset=offset,
limit=resolved_limit,
source=source,
scan_name=scan_name,
date_from=date_from,
date_to=date_to,
)
[docs]
def query(
self,
*,
scan_type: ScanType | str | None = None,
source: str | None = None,
scan_name: str | None = None,
date_from: datetime | None = None,
date_to: datetime | None = None,
page_size: int | None = None,
max_results: int | None = None,
) -> Table:
"""Query scans across all pages with optional filters.
Iterates through paginated results until all matching records have been
collected or *max_results* is reached. Client-side date and name
filtering is applied as a safety net when the backend does not support
those parameters.
Parameters
----------
scan_type : ScanType | str | None, optional
Restrict to a single scan type. ``None`` queries all scan types.
source : str | None, optional
Filter by source name.
scan_name : str | None, optional
Filter by partial filename.
date_from : datetime | None, optional
Lower bound for observation date (inclusive).
date_to : datetime | None, optional
Upper bound for observation date (inclusive).
page_size : int | None, optional
Records fetched per HTTP request. Defaults to
:attr:`~grad300_client.conf.default_page_size`.
max_results : int | None, optional
Stop after collecting this many records.
Returns
-------
astropy.table.Table
One row per matching scan.
"""
resolved_page_size = self._resolve_page_size(page_size, field_name="page_size")
return self._scans.query(
scan_type=scan_type,
source=source,
scan_name=scan_name,
date_from=date_from,
date_to=date_to,
page_size=resolved_page_size,
max_results=max_results,
)
[docs]
def query_records(
self,
*,
scan_type: ScanType | str | None = None,
source: str | None = None,
scan_name: str | None = None,
date_from: datetime | None = None,
date_to: datetime | None = None,
page_size: int | None = None,
max_results: int | None = None,
) -> List[ScanRecord]:
"""Query scans across all pages and return structured records.
Parameters
----------
scan_type : ScanType | str | None, optional
Restrict to a single scan type. ``None`` queries all scan types.
source : str | None, optional
Filter by source name.
scan_name : str | None, optional
Filter by partial filename.
date_from : datetime | None, optional
Lower bound for observation date.
date_to : datetime | None, optional
Upper bound for observation date.
page_size : int | None, optional
Records fetched per HTTP request.
max_results : int | None, optional
Stop after collecting this many records.
Returns
-------
list[ScanRecord]
Matching scan records.
"""
resolved_page_size = self._resolve_page_size(page_size, field_name="page_size")
return self._scans.query_records(
scan_type=scan_type,
source=source,
scan_name=scan_name,
date_from=date_from,
date_to=date_to,
page_size=resolved_page_size,
max_results=max_results,
)
[docs]
def query_source(
self,
source: str,
*,
scan_type: ScanType | str | None = None,
page_size: int | None = None,
max_results: int | None = None,
) -> Table:
"""Query all scans for a given source name.
Convenience wrapper around :meth:`query`.
Parameters
----------
source : str
Source name to search for.
scan_type : ScanType | str | None, optional
Restrict to a single scan type.
page_size : int | None, optional
Records per HTTP request.
max_results : int | None, optional
Maximum number of records to return.
Returns
-------
astropy.table.Table
One row per matching scan.
"""
resolved_page_size = self._resolve_page_size(page_size, field_name="page_size")
return self._scans.query_source(
source,
scan_type=scan_type,
page_size=resolved_page_size,
max_results=max_results,
)
[docs]
def query_scan_name(
self,
scan_name: str,
*,
scan_type: ScanType | str | None = None,
page_size: int | None = None,
max_results: int | None = None,
) -> Table:
"""Query scans whose filename contains *scan_name*.
Convenience wrapper around :meth:`query`.
Parameters
----------
scan_name : str
Partial filename fragment to search for.
scan_type : ScanType | str | None, optional
Restrict to a single scan type.
page_size : int | None, optional
Records per HTTP request.
max_results : int | None, optional
Maximum number of records to return.
Returns
-------
astropy.table.Table
One row per matching scan.
"""
resolved_page_size = self._resolve_page_size(page_size, field_name="page_size")
return self._scans.query_scan_name(
scan_name,
scan_type=scan_type,
page_size=resolved_page_size,
max_results=max_results,
)
[docs]
def query_date_range(
self,
*,
date_from: datetime | None = None,
date_to: datetime | None = None,
scan_type: ScanType | str | None = None,
page_size: int | None = None,
max_results: int | None = None,
) -> Table:
"""Query scans observed within a date range.
Convenience wrapper around :meth:`query`. At least one of *date_from*
or *date_to* should be provided.
Parameters
----------
date_from : datetime | None, optional
Lower bound for observation date (inclusive).
date_to : datetime | None, optional
Upper bound for observation date (inclusive).
scan_type : ScanType | str | None, optional
Restrict to a single scan type.
page_size : int | None, optional
Records per HTTP request.
max_results : int | None, optional
Maximum number of records to return.
Returns
-------
astropy.table.Table
One row per matching scan.
"""
resolved_page_size = self._resolve_page_size(page_size, field_name="page_size")
return self._scans.query_date_range(
date_from=date_from,
date_to=date_to,
scan_type=scan_type,
page_size=resolved_page_size,
max_results=max_results,
)
[docs]
def download(
self,
record: ScanRecord | Mapping[str, Any] | Row,
*,
destination_dir: str | Path | None = None,
overwrite: bool = False,
) -> DownloadResult:
"""Download the FITS file associated with a scan record.
The return type depends on the scan type and whether *destination_dir*
is given:
* ``TPI`` without *destination_dir* → :class:`astropy.table.Table`
* Other types without *destination_dir* → :class:`astropy.io.fits.HDUList`
* With *destination_dir* → :class:`pathlib.Path` of the saved file
Parameters
----------
record : ScanRecord | Mapping[str, Any] | astropy.table.Row
Scan record with ``id``, ``scan_type`` and ``file_name``.
destination_dir : str | pathlib.Path | None, optional
Directory where the file is written. If ``None``, data is returned
in memory.
overwrite : bool, default=False
Overwrite existing destination file.
Returns
-------
DownloadResult
In-memory data or path to the saved file.
Raises
------
FileExistsError
If destination exists and ``overwrite`` is ``False``.
"""
return self._scans.download(
record,
destination_dir=destination_dir,
overwrite=overwrite,
)
[docs]
def download_by_id(
self,
*,
scan_type: ScanType | str,
scan_id: int,
destination_dir: str | Path | None = None,
file_name: str | None = None,
overwrite: bool = False,
) -> DownloadResult:
"""Download a scan by its database ID.
Parameters
----------
scan_type : ScanType | str
Scan type of the target record.
scan_id : int
Database identifier of the scan.
destination_dir : str | pathlib.Path | None, optional
Directory where the file is written.
file_name : str | None, optional
Override output filename when writing to disk.
overwrite : bool, default=False
Overwrite an existing file.
Returns
-------
DownloadResult
In-memory data or path to the saved file.
"""
return self._scans.download_by_id(
scan_type=scan_type,
scan_id=scan_id,
destination_dir=destination_dir,
file_name=file_name,
overwrite=overwrite,
)
[docs]
def download_by_scan_name(
self,
scan_name: str,
*,
scan_type: ScanType | str | None = None,
destination_dir: str | Path | None = None,
overwrite: bool = False,
) -> DownloadResult:
"""Download a scan by its exact filename.
Queries the API for *scan_name* first and expects exactly one result
with a matching ``file_name``.
Parameters
----------
scan_name : str
Exact filename of the scan.
scan_type : ScanType | str | None, optional
Narrow search to a specific scan type.
destination_dir : str | pathlib.Path | None, optional
Directory where the file is written.
overwrite : bool, default=False
Overwrite an existing file.
Returns
-------
DownloadResult
In-memory data or path to the saved file.
Raises
------
LookupError
If zero or multiple scans match ``scan_name``.
"""
return self._scans.download_by_scan_name(
scan_name,
scan_type=scan_type,
destination_dir=destination_dir,
overwrite=overwrite,
)
[docs]
def request(self, method: str, path: str, **kwargs: Any) -> httpx.Response:
"""Send a raw HTTP request to the GRAD300 API.
Parameters
----------
method : str
HTTP method (``"GET"``, ``"POST"``, ...).
path : str
Path relative to :attr:`base_api_url`.
**kwargs : Any
Additional keyword arguments passed to ``httpx.Client.request``.
Returns
-------
httpx.Response
Raw response object.
"""
return self._request(method, path, **kwargs)
[docs]
def get_json(self, path: str, *, params: dict[str, Any] | None = None) -> Any:
"""Send a GET request and decode JSON payload.
Parameters
----------
path : str
Path relative to :attr:`base_api_url`.
params : dict[str, Any] | None, optional
Query-string parameters.
Returns
-------
Any
Decoded JSON payload.
"""
response = self._request("GET", path, params=params)
return response.json()
[docs]
def close(self) -> None:
"""Close the underlying HTTP connection pool."""
self._client.close()
def __enter__(self) -> "Grad300Client":
return self
def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> None:
self.close()
def _resolve_page_size(self, value: int | None, *, field_name: str) -> int:
resolved = self._default_page_size if value is None else value
if resolved <= 0:
raise ValueError(f"'{field_name}' must be greater than 0")
return resolved
def _request(self, method: str, path: str, **kwargs: Any) -> httpx.Response:
try:
response = self._client.request(method, path, **kwargs)
except httpx.HTTPError as exc:
raise Grad300Error(
f"Network error while calling GRAD300 API: {exc}"
) from exc
if response.status_code >= 400:
raise _build_api_error(response)
return response
[docs]
class Grad300(Grad300Client):
"""Alias class for a compact astroquery-like import style."""
def _build_api_base_url(*, base_url: str, api_prefix: str) -> str:
normalized_base = base_url.rstrip("/")
normalized_prefix = api_prefix if api_prefix.startswith("/") else f"/{api_prefix}"
normalized_prefix = normalized_prefix.rstrip("/")
if normalized_base.endswith(normalized_prefix):
return normalized_base
return f"{normalized_base}{normalized_prefix}"
def _build_api_error(response: httpx.Response) -> Grad300APIError:
message = f"Grad300 API error (HTTP {response.status_code})"
body: Any | None = None
try:
body = response.json()
except ValueError:
body = response.text
if isinstance(body, dict):
detail = body.get("detail")
if isinstance(detail, str) and detail:
message = detail
elif isinstance(body, str) and body:
message = body
path = response.request.url.path
is_auth_error = response.status_code in {401, 403} or (
path.endswith("/login/access-token") and response.status_code == 400
)
error_cls: type[Grad300APIError]
if is_auth_error:
error_cls = AuthenticationError
else:
error_cls = Grad300APIError
return error_cls(
response.status_code,
message,
response_body=body,
)