"""Client implementation for managing spans and traces in the Arize platform."""
from __future__ import annotations
import json
import logging
import re
from datetime import datetime, timezone
from functools import partial
from typing import TYPE_CHECKING, Any
import numpy as np
import pandas as pd
import pyarrow as pa
from google.protobuf import json_format, message
from arize._exporter.client import ArizeExportClient
from arize._flight.client import ArizeFlightClient, FlightPostArrowFileResponse
from arize._flight.types import FlightRequestType
from arize.constants.spans import DEFAULT_DATETIME_FMT
from arize.exceptions.base import (
INVALID_ARROW_CONVERSION_MSG,
ValidationError,
ValidationFailure,
)
from arize.exceptions.models import MissingProjectNameError
from arize.exceptions.spaces import MissingSpaceIDError
from arize.logging import CtxAdapter
from arize.ml.types import Environments
from arize.pre_releases import ReleaseStage, prerelease_endpoint
from arize.spans.validation.metadata.value_validation import (
InvalidPatchDocumentFormat,
)
from arize.utils.arrow import post_arrow_table
from arize.utils.dataframe import (
remove_extraneous_columns,
reset_dataframe_index,
)
from arize.utils.proto import get_pb_schema_tracing
from arize.utils.resolve import _find_project_id
if TYPE_CHECKING:
# builtins is needed to use builtins.list in type annotations because
# the class has a list() method that shadows the built-in list type
import builtins
import requests
from arize._generated.api_client.api_client import ApiClient
from arize._generated.protocol.flight import flight_pb2
from arize.config import SDKConfiguration
from arize.spans.types import SpansDelete200Response, SpansList200Response
logger = logging.getLogger(__name__)
[docs]
class SpansClient:
"""Client for logging LLM tracing spans and evaluations to Arize.
This class is primarily intended for internal use within the SDK. Users are
highly encouraged to access resource-specific functionality via
:class:`arize.ArizeClient`.
"""
def __init__(
self, *, sdk_config: SDKConfiguration, generated_client: ApiClient
) -> None:
"""
Args:
sdk_config: Resolved SDK configuration.
generated_client: Shared generated API client instance.
""" # noqa: D205, D212
self._sdk_config = sdk_config
# Import at runtime so it's still lazy and extras-gated by the parent
from arize._generated import api_client as gen
# Use the provided client directly
self._api = gen.SpansApi(generated_client)
self._projects_api = gen.ProjectsApi(generated_client)
[docs]
@prerelease_endpoint(key="spans.delete", stage=ReleaseStage.ALPHA)
def delete(
self,
*,
project: str,
span_ids: builtins.list[str],
space: str | None = None,
) -> SpansDelete200Response | None:
"""Permanently delete spans by their IDs.
This operation is irreversible. Only spans within the supported
lookback window (2 years) are considered; older spans are not affected. If one
or more span IDs are not found, they are silently ignored.
Args:
project: Project name or global ID (base64) containing the spans.
If the value is a name, ``space`` must also be provided.
span_ids: List of span IDs to delete.
space: Optional space name or ID used to disambiguate the project
lookup. Required when ``project`` is a name.
Returns:
``None`` when all spans were deleted (HTTP 204). A response
object with ``deleted_span_ids`` when the server reports a
partial deletion (HTTP 200) — retry the original request for
a complete result.
Raises:
ValueError: If ``span_ids`` is empty.
ApiException: If the REST API returns an error response
(e.g. 401/403/429).
"""
if not span_ids:
raise ValueError("span_ids must not be empty")
project_id = _find_project_id(
api=self._projects_api,
project=project,
space=space,
)
from arize._generated import api_client as gen
body = gen.SpansDeleteRequest(
project_id=project_id,
span_ids=span_ids,
)
return self._api.spans_delete(spans_delete_request=body)
[docs]
@prerelease_endpoint(key="spans.list", stage=ReleaseStage.ALPHA)
def list(
self,
*,
project: str,
space: str | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
filter: str | None = None,
limit: int = 100,
cursor: str | None = None,
) -> SpansList200Response:
"""List spans for a project within a time range.
Spans are returned in descending start-time order (most recent first).
If ``start_time`` and ``end_time`` are not provided, the query covers the
last seven days relative to the time of the request.
Args:
project: Project name or global ID (base64) to list spans for.
If the value is a name, ``space`` must also be provided.
space: Optional space name or ID used to disambiguate the project
lookup. Required when ``project`` is a name.
start_time: Inclusive lower bound of the time window. Defaults to
seven days before the request time.
end_time: Exclusive upper bound of the time window. Defaults to the
request time.
filter: Optional filter expression to narrow results. Supports
equality, comparison, and SQL-style ``AND``/``OR`` operators.
Examples::
"status_code = 'ERROR'"
"eval.Custom_eval_correctness.label = 'correct'"
"annotation.Correctness.label = 'Correct'"
"latency_ms > 1789"
"status_code = 'ERROR' AND eval.Custom_eval_correctness.label = 'correct'"
"status_code = 'ERROR' OR eval.Custom_eval_correctness.label = 'correct'"
limit: Maximum number of spans to return. The server enforces an
upper bound. Defaults to 100.
cursor: Opaque pagination cursor returned from a previous response.
Returns:
A response object with the spans and pagination information.
Raises:
ApiException: If the REST API
returns an error response (e.g. 401/403/429).
"""
project_id = _find_project_id(
api=self._projects_api,
project=project,
space=space,
)
logger.warning(
"The spans.list endpoint is currently in active development and may "
"not be suitable to download large amounts of spans. Use `export_to_df` instead."
)
from arize._generated import api_client as gen
body = gen.SpansListRequest(
project_id=project_id,
start_time=start_time,
end_time=end_time,
filter=filter,
)
return self._api.spans_list(
spans_list_request=body,
limit=limit,
cursor=cursor,
)
[docs]
def log(
self,
*,
space_id: str,
project_name: str,
dataframe: pd.DataFrame,
evals_dataframe: pd.DataFrame | None = None,
datetime_format: str = DEFAULT_DATETIME_FMT,
validate: bool = True,
timeout: float | None = None,
tmp_dir: str = "",
) -> requests.Response:
"""Logs a pandas dataframe containing LLM tracing data to Arize via a POST request.
Returns a :class:`Response` object from the Requests HTTP library to ensure
successful delivery of records.
Args:
space_id: The space ID where the project resides.
project_name: A unique name to identify your project in the Arize platform.
dataframe (:class:`pandas.DataFrame`): The dataframe containing the LLM traces.
evals_dataframe (:class:`pandas.DataFrame` | :obj:`None`): A dataframe containing
LLM evaluations data. The evaluations are joined to their corresponding spans
via a left outer join, i.e., using only `context.span_id` from the spans
dataframe. Defaults to None.
datetime_format: format for the timestamp captured in the LLM traces.
Defaults to "%Y-%m-%dT%H:%M:%S.%f+00:00".
validate: When set to True, validation is run before sending data.
Defaults to True.
timeout: You can stop waiting for a response after a given number
of seconds with the timeout parameter. Defaults to None.
tmp_dir: Temporary directory/file to store the serialized data in binary
before sending to Arize.
Returns:
Response object from the HTTP request (only returned on HTTP 2xx).
Raises:
MissingSpaceIDError: If space_id is not provided or empty.
MissingProjectNameError: If project_name is not provided or empty.
ValidationFailure: If validate=True and validation checks fail.
pa.ArrowInvalid: If the dataframe cannot be converted to Arrow format.
AuthenticationError: If the server returns HTTP 401 or 403 (invalid API key or
space ID). Raised immediately to prevent further uploads with bad credentials.
APIError: If the server returns any other non-2xx response (e.g. 400, 422, 429,
5xx). Raised immediately to prevent further uploads when the server signals
an error.
"""
from arize.spans.columns import (
EVAL_COLUMN_PATTERN,
ROOT_LEVEL_SPAN_KIND_COL,
SPAN_KIND_COL,
SPAN_OPENINFERENCE_COLUMNS,
SPAN_SPAN_ID_COL,
)
from arize.spans.conversion import (
convert_timestamps,
jsonify_dictionaries,
)
from arize.spans.validation.evals import evals_validation
from arize.spans.validation.spans import spans_validation
# This method requires a space_id and project_name
if not space_id:
raise MissingSpaceIDError()
if not project_name:
raise MissingProjectNameError()
# We need our own copy since we will manipulate the underlying data and
# do not want side effects
spans_df = dataframe.copy()
evals_df = (
evals_dataframe.copy() if evals_dataframe is not None else None
)
# Bind common context for this operation
log = CtxAdapter(
logger,
{
"resource": "spans",
"operation": "log",
"space_id": space_id,
"project": project_name,
"validate": validate,
"spans_df_rows": len(spans_df),
"evals_df_rows": len(evals_df) if evals_df is not None else 0,
},
)
# We expect the index to be 0,1,2,3..., len(df)-1. Phoenix, for example, will give us a dataframe
# with context_id as the index
reset_dataframe_index(dataframe=spans_df)
if evals_df is not None:
reset_dataframe_index(dataframe=evals_df)
log.debug("Performing direct input type validation")
errors = spans_validation.validate_argument_types(
spans_dataframe=spans_df,
project_name=project_name,
dt_fmt=datetime_format,
)
if evals_df is not None:
eval_errors = evals_validation.validate_argument_types(
evals_dataframe=evals_df,
project_name=project_name,
)
errors += eval_errors
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
if validate:
log.debug("Performing dataframe form validation")
errors = spans_validation.validate_dataframe_form(
spans_dataframe=spans_df
)
if evals_df is not None:
eval_errors = evals_validation.validate_dataframe_form(
evals_dataframe=evals_df
)
errors += eval_errors
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
log.debug("Removing unnecessary columns")
spans_df = remove_extraneous_columns(
df=spans_df,
column_list=[col.name for col in SPAN_OPENINFERENCE_COLUMNS],
regex=r"^attributes\.",
)
if evals_df is not None:
evals_df = remove_extraneous_columns(
df=evals_df,
column_list=[SPAN_SPAN_ID_COL.name],
regex=EVAL_COLUMN_PATTERN,
)
log.debug("Converting timestamps")
spans_df = convert_timestamps(df=spans_df, fmt=datetime_format)
if validate:
log.debug("Performing values validation")
errors = spans_validation.validate_values(
spans_dataframe=spans_df,
project_name=project_name,
)
if evals_df is not None:
eval_errors = evals_validation.validate_values(
evals_dataframe=evals_df,
project_name=project_name,
)
errors += eval_errors
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
log.debug("Converting dictionaries to JSON objects")
spans_df = jsonify_dictionaries(spans_df)
if (
ROOT_LEVEL_SPAN_KIND_COL.name in spans_df.columns
and SPAN_KIND_COL.name not in spans_df.columns
):
log.debug("Moving span kind to atributes")
spans_df.rename(
columns={ROOT_LEVEL_SPAN_KIND_COL.name: SPAN_KIND_COL.name},
inplace=True,
)
df = (
pd.merge(spans_df, evals_df, on=SPAN_SPAN_ID_COL.name, how="left")
if evals_df is not None
else spans_df
)
# Convert to Arrow table
try:
log.debug("Converting data to Arrow format")
pa_table = pa.Table.from_pandas(df, preserve_index=False)
except pa.ArrowInvalid as e:
log.exception(INVALID_ARROW_CONVERSION_MSG)
raise pa.ArrowInvalid(
f"Error converting to Arrow format: {e!s}"
) from e
except Exception:
log.exception("Unexpected error creating Arrow table")
raise
proto_schema = get_pb_schema_tracing(project_name=project_name)
# Create headers copy for the spans client
# Safe to mutate, returns a deep copy
headers = self._sdk_config.headers
# Send the number of rows in the dataframe as a header
# This helps the Arize server to return appropriate feedback, specially for async logging
headers.update(
{
"arize-space-id": space_id,
"arize-interface": "batch",
"number-of-rows": str(len(spans_df)),
}
)
return post_arrow_table(
files_url=self._sdk_config.files_url,
pa_table=pa_table,
proto_schema=proto_schema,
headers=headers,
timeout=timeout,
verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
tmp_dir=tmp_dir,
)
[docs]
def update_evaluations(
self,
*,
space_id: str,
project_name: str,
dataframe: pd.DataFrame,
validate: bool = True,
force_http: bool = False,
timeout: float | None = None,
tmp_dir: str = "",
) -> flight_pb2.WriteSpanEvaluationResponse:
"""Logs a pandas dataframe containing LLM evaluations data to Arize via a Flight gRPC request.
The dataframe must contain a column `context.span_id` such that Arize can assign
each evaluation to its respective span.
Args:
space_id: The space ID where the project resides.
project_name: A unique name to identify your project in the Arize platform.
dataframe (:class:`pandas.DataFrame`): A dataframe containing LLM evaluations data.
validate: When set to True, validation is run before sending data.
Defaults to True.
force_http: Force the use of HTTP for data upload. Defaults to False.
timeout: You can stop waiting for a response after a given number
of seconds with the timeout parameter. Defaults to None.
tmp_dir: Temporary directory/file to store the serialized data in binary
before sending to Arize.
Raises:
MissingSpaceIDError: If space_id is not provided or empty.
MissingProjectNameError: If project_name is not provided or empty.
ValidationFailure: If validate=True and validation checks fail.
pa.ArrowInvalid: If the dataframe cannot be converted to Arrow format.
AuthenticationError: If the server returns HTTP 401 or 403.
Raised immediately to prevent further uploads with bad credentials.
APIError: If the server returns any other non-2xx response.
Raised immediately to prevent further uploads when the server signals an error.
"""
from arize.spans.columns import EVAL_COLUMN_PATTERN, SPAN_SPAN_ID_COL
from arize.spans.validation.evals import evals_validation
# This method requires a space_id and project_name
if not space_id:
raise MissingSpaceIDError()
if not project_name:
raise MissingProjectNameError()
# Bind common context for this operation
log = CtxAdapter(
logger,
{
"resource": "spans",
"operation": "log",
"space_id": space_id,
"project": project_name,
"validate": validate,
"evals_df_rows": len(dataframe),
},
)
# We need our own copy since we will manipulate the underlying data and
# do not want side effects
evals_df = dataframe.copy()
# We expect the index to be 0,1,2,3..., len(df)-1. Phoenix, for example, will give us a dataframe
# with context_id as the index; the old index is not meaningful in our copy of the original dataframe
# so we can drop it.
reset_dataframe_index(dataframe=evals_df)
log.debug("Performing direct input type validation")
errors = evals_validation.validate_argument_types(
evals_dataframe=evals_df,
project_name=project_name,
)
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
if validate:
log.debug("Performing dataframe form validation")
errors = evals_validation.validate_dataframe_form(
evals_dataframe=evals_df
)
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
log.debug("Removing unnecessary columns")
evals_df = remove_extraneous_columns(
df=evals_df,
column_list=[SPAN_SPAN_ID_COL.name],
regex=EVAL_COLUMN_PATTERN,
)
if validate:
log.debug("Performing values validation")
errors = evals_validation.validate_values(
evals_dataframe=evals_df,
project_name=project_name,
)
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
# Convert to Arrow table
try:
log.debug("Converting data to Arrow format")
pa_table = pa.Table.from_pandas(evals_df, preserve_index=False)
except pa.ArrowInvalid as e:
log.exception(INVALID_ARROW_CONVERSION_MSG)
raise pa.ArrowInvalid(
f"Error converting to Arrow format: {e!s}"
) from e
except Exception:
log.exception("Unexpected error creating Arrow table")
raise
if force_http:
proto_schema = get_pb_schema_tracing(project_name=project_name)
# Create headers copy for the spans client
# Safe to mutate, returns a deep copy
headers = self._sdk_config.headers
# Send the number of rows in the dataframe as a header
# This helps the Arize server to return appropriate feedback, specially for async logging
headers.update(
{
"arize-space-id": space_id,
"arize-interface": "batch",
"number-of-rows": str(len(dataframe)),
}
)
return post_arrow_table(
files_url=self._sdk_config.files_url,
pa_table=pa_table,
proto_schema=proto_schema,
headers=headers,
timeout=timeout,
verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
tmp_dir=tmp_dir,
)
request_type = FlightRequestType.EVALUATION
response = None
with ArizeFlightClient(
api_key=self._sdk_config.api_key,
host=self._sdk_config.flight_host,
port=self._sdk_config.flight_port,
scheme=self._sdk_config.flight_scheme,
request_verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
) as flight_client:
try:
response = flight_client.log_arrow_table(
space_id=space_id,
project_name=project_name,
pa_table=pa_table,
request_type=request_type,
)
except Exception as e:
msg = f"Error during update request: {e!s}"
log.exception(msg)
raise RuntimeError(msg) from e
if response is None:
# This should not happen with proper Flight client implementation,
# but we handle it defensively
msg = "No response received from flight server during update"
log.error(msg)
raise RuntimeError(msg)
_log_flight_update_summary(
project_name=project_name,
total_spans=len(pa_table),
request_type=request_type,
response=response,
)
# Convert Protocol Buffer SpanError objects to dictionaries for easier access
return _message_to_dict(response)
[docs]
def update_annotations(
self,
*,
space_id: str,
project_name: str,
dataframe: pd.DataFrame,
validate: bool = True,
) -> flight_pb2.WriteSpanAnnotationResponse:
"""Logs a pandas dataframe containing LLM span annotations to Arize via a Flight gRPC request.
The dataframe must contain a column `context.span_id` such that Arize can assign
each annotation to its respective span. Annotation columns should follow the pattern
`annotation.<name>.<suffix>` where suffix is `label`, `score`, or `text`. An optional
`annotation.notes` column can be included for free-form text notes.
Args:
space_id: The space ID where the project resides.
project_name: A unique name to identify your project in the Arize platform.
dataframe (:class:`pandas.DataFrame`): A dataframe containing LLM annotation data.
validate: When set to True, validation is run before sending data.
Defaults to True.
"""
from arize.spans.columns import (
ANNOTATION_COLUMN_PATTERN,
ANNOTATION_LABEL_SUFFIX,
ANNOTATION_NOTES_COLUMN_NAME,
ANNOTATION_SCORE_SUFFIX,
ANNOTATION_TEXT_SUFFIX,
ANNOTATION_UPDATED_AT_SUFFIX,
ANNOTATION_UPDATED_BY_SUFFIX,
SPAN_SPAN_ID_COL,
)
from arize.spans.validation.annotations import annotations_validation
# This method requires a space_id and project_name
if not space_id:
raise MissingSpaceIDError()
if not project_name:
raise MissingProjectNameError()
# Bind common context for this operation
log = CtxAdapter(
logger,
{
"resource": "spans",
"operation": "log",
"space_id": space_id,
"project": project_name,
"validate": validate,
"evals_df_rows": len(dataframe),
},
)
anno_df = dataframe.copy()
# We expect the index to be 0,1,2,3..., len(df)-1. Phoenix, for example, will give us a dataframe
# with context_id as the index; the old index is not meaningful in our copy of the original dataframe
# so we can drop it.
reset_dataframe_index(dataframe=anno_df)
log.debug(
"Checking for and autogenerating missing updated_by/updated_at annotation columns"
)
annotation_cols = [
col
for col in anno_df.columns
if re.match(ANNOTATION_COLUMN_PATTERN, col)
]
annotation_names = set()
# Extract unique annotation names (e.g., "quality" from "annotation.quality.label")
for col in annotation_cols:
match = re.match(r"^annotation\.([a-zA-Z0-9_\s]+?)(\..+)$", col)
if match:
annotation_names.add(match.group(1))
log.debug(f"Found annotation names: {annotation_names}")
current_time_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
for name in annotation_names:
updated_by_col = f"annotation.{name}{ANNOTATION_UPDATED_BY_SUFFIX}"
updated_at_col = f"annotation.{name}{ANNOTATION_UPDATED_AT_SUFFIX}"
label_col = f"annotation.{name}{ANNOTATION_LABEL_SUFFIX}"
score_col = f"annotation.{name}{ANNOTATION_SCORE_SUFFIX}"
text_col = f"annotation.{name}{ANNOTATION_TEXT_SUFFIX}"
# Check if *any* part of this annotation exists (label, score, or text)
# Only add metadata if the annotation itself is present
if (
label_col in anno_df.columns
or score_col in anno_df.columns
or text_col in anno_df.columns
):
if updated_by_col not in anno_df.columns:
log.debug(f"Autogenerating column: {updated_by_col}")
anno_df[updated_by_col] = "SDK"
if updated_at_col not in anno_df.columns:
log.debug(f"Autogenerating column: {updated_at_col}")
anno_df[updated_at_col] = current_time_ms
else:
log.debug(
f"Skipping metadata generation for '{name}' as no label, score, or text column found."
)
if ANNOTATION_NOTES_COLUMN_NAME in anno_df.columns:
log.debug(
f"Formatting {ANNOTATION_NOTES_COLUMN_NAME} column to JSON strings within lists."
)
anno_df[ANNOTATION_NOTES_COLUMN_NAME] = anno_df[
ANNOTATION_NOTES_COLUMN_NAME
].apply(
partial(
_format_note_for_storage,
current_time_ms=current_time_ms,
)
)
log.debug("Performing direct input type validation for annotations")
errors = annotations_validation.validate_argument_types(
annotations_dataframe=anno_df,
project_name=project_name,
)
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
if validate:
log.debug("Performing dataframe form validation for annotations")
errors = annotations_validation.validate_dataframe_form(
annotations_dataframe=anno_df
)
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
log.debug("Removing unnecessary annotation columns")
# Update columns to keep: span_id, annotation.notes, and annotation pattern
columns_to_keep = [SPAN_SPAN_ID_COL.name]
if ANNOTATION_NOTES_COLUMN_NAME in anno_df.columns:
columns_to_keep.append(ANNOTATION_NOTES_COLUMN_NAME)
anno_df = remove_extraneous_columns(
df=anno_df,
column_list=columns_to_keep,
regex=ANNOTATION_COLUMN_PATTERN,
)
if validate:
log.debug("Performing annotation values validation")
errors = annotations_validation.validate_values(
annotations_dataframe=anno_df,
project_name=project_name,
)
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
# Convert to Arrow table
try:
log.debug("Converting data to Arrow format")
pa_table = pa.Table.from_pandas(anno_df, preserve_index=False)
except pa.ArrowInvalid as e:
log.exception(INVALID_ARROW_CONVERSION_MSG)
raise pa.ArrowInvalid(
f"Error converting to Arrow format: {e!s}"
) from e
except Exception:
log.exception("Unexpected error creating Arrow table")
raise
if ANNOTATION_NOTES_COLUMN_NAME in anno_df.columns:
notes_field = pa_table.schema.field(ANNOTATION_NOTES_COLUMN_NAME)
if not (
isinstance(notes_field.type, pa.ListType)
and notes_field.type.value_type == pa.string()
):
log.warning(
f"Warning: Inferred type for {ANNOTATION_NOTES_COLUMN_NAME} is "
f"{notes_field.type}, expected list<string>."
)
request_type = FlightRequestType.ANNOTATION
response = None
with ArizeFlightClient(
api_key=self._sdk_config.api_key,
host=self._sdk_config.flight_host,
port=self._sdk_config.flight_port,
scheme=self._sdk_config.flight_scheme,
request_verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
) as flight_client:
try:
response = flight_client.log_arrow_table(
space_id=space_id,
project_name=project_name,
pa_table=pa_table,
request_type=request_type,
)
except Exception as e:
msg = f"Error during update request: {e!s}"
log.exception(msg)
raise RuntimeError(msg) from e
if response is None:
# This should not happen with proper Flight client implementation,
# but we handle it defensively
msg = "No response received from flight server during update"
log.error(msg)
raise RuntimeError(msg)
_log_flight_update_summary(
project_name=project_name,
total_spans=len(pa_table),
request_type=request_type,
response=response,
)
# Convert Protocol Buffer SpanError objects to dictionaries for easier access
return _message_to_dict(response)
[docs]
def export_to_df(
self,
*,
space_id: str,
project_name: str,
start_time: datetime,
end_time: datetime,
where: str = "",
columns: builtins.list | None = None,
stream_chunk_size: int | None = None,
) -> pd.DataFrame:
"""Export span data from Arize to a :class:`pandas.DataFrame`.
Retrieves trace/span data from the specified project within a time range
and returns it as a :class:`pandas.DataFrame`. Supports filtering with SQL-like
WHERE clauses and similarity search for semantic retrieval.
Returns:
:class:`pandas.DataFrame`: DataFrame containing the requested span data with columns
for span metadata, attributes, events, and any custom fields.
"""
with ArizeFlightClient(
api_key=self._sdk_config.api_key,
host=self._sdk_config.flight_host,
port=self._sdk_config.flight_port,
scheme=self._sdk_config.flight_scheme,
request_verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
) as flight_client:
exporter = ArizeExportClient(
flight_client=flight_client,
)
return exporter.export_to_df(
space_id=space_id,
model_id=project_name,
environment=Environments.TRACING,
start_time=start_time,
end_time=end_time,
where=where,
columns=columns,
stream_chunk_size=stream_chunk_size,
)
[docs]
def export_to_parquet(
self,
*,
path: str,
space_id: str,
project_name: str,
start_time: datetime,
end_time: datetime,
where: str = "",
columns: builtins.list | None = None,
stream_chunk_size: int | None = None,
) -> None:
"""Export span data from Arize to a Parquet file.
Retrieves trace/span data from the specified project within a time range
and writes it directly to a Parquet file at the specified path. Supports
filtering with SQL-like WHERE clauses for efficient querying. Ideal for
large datasets and long-term storage.
Args:
path: The file path where the Parquet file will be written.
space_id: The space ID where the project resides.
project_name: The name of the project to export span data from.
start_time: Start of the time range (inclusive) as a datetime object.
end_time: End of the time range (inclusive) as a datetime object.
where: Optional SQL-like WHERE clause to filter rows (e.g., "span.status_code = 'ERROR'").
columns: Optional list of column names to include. If None, all columns are returned.
stream_chunk_size: Optional chunk size for streaming large result sets.
Raises:
RuntimeError: If the Flight client request fails or returns no response.
Notes:
- Uses Apache Arrow Flight for efficient data transfer
- Data is written directly to the specified path as a Parquet file
- Large exports may benefit from specifying stream_chunk_size
"""
with ArizeFlightClient(
api_key=self._sdk_config.api_key,
host=self._sdk_config.flight_host,
port=self._sdk_config.flight_port,
scheme=self._sdk_config.flight_scheme,
request_verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
) as flight_client:
exporter = ArizeExportClient(
flight_client=flight_client,
)
exporter.export_to_parquet(
path=path,
space_id=space_id,
model_id=project_name,
environment=Environments.TRACING,
start_time=start_time,
end_time=end_time,
where=where,
columns=columns,
stream_chunk_size=stream_chunk_size,
)
def _build_patch_document(row: pd.Series) -> dict[str, object]:
"""Build a patch document from a pandas Series row by extracting metadata fields.
Args:
row: A pandas Series representing a row of data with potential metadata columns.
Returns:
dict[str, object]: A dictionary mapping metadata field names (without the
'attributes.metadata.' prefix) to their values, preserving arrays and scalars.
"""
# Extract and preserve metadata values with proper types
patch = {}
for key in row.index:
if key.startswith("attributes.metadata."):
field_name = key.replace("attributes.metadata.", "")
# Check if the value is an array/list or other iterable (except strings)
if isinstance(row[key], (list, np.ndarray)) or (
hasattr(row[key], "__iter__") and not isinstance(row[key], str)
):
# For arrays/iterables, just add the value (nulls will be handled later)
patch[field_name] = row[key]
else:
# For scalar values, include even if it's None/null
# This is important for explicitly setting fields to null
patch[field_name] = row[key]
return patch
def _process_patch_document(
metadata_df: pd.DataFrame,
patch_document_column_name: str,
field_patches: pd.Series[Any],
row_idx: int,
) -> dict[str, object]:
"""Process and merge patch documents from field patches and explicit patch column.
Args:
metadata_df: DataFrame containing the metadata with patch documents.
patch_document_column_name: Name of the column containing explicit patch documents.
field_patches: DataFrame containing patches derived from individual metadata fields.
row_idx: The row index to process.
Returns:
dict[str, object]: Merged patch document where explicit patches take precedence over
field patches. Returns empty dict if patch document is invalid or missing.
"""
# Get the field patch for this row
field_patch = field_patches.iloc[row_idx]
# Get and process the explicit patch document
patch_doc = metadata_df.loc[row_idx, patch_document_column_name]
# Handle different patch document formats
if patch_doc is None:
# None (as opposed to NaN) is a valid value but creates an empty patch
explicit_patch = {}
elif isinstance(patch_doc, float) and np.isnan(patch_doc):
# NaN is treated as an empty patch
explicit_patch = {}
elif isinstance(patch_doc, dict):
# Dict is used directly
explicit_patch = patch_doc
elif isinstance(patch_doc, str):
try:
explicit_patch = json.loads(patch_doc)
if not isinstance(explicit_patch, dict):
logger.warning(
f"Row {row_idx}: Parsed patch document is not a dictionary. "
f"Using empty dictionary instead."
)
explicit_patch = {}
except json.JSONDecodeError as e:
logger.warning(
f"Row {row_idx}: Failed to parse patch document: {e}. "
f"Using empty dictionary instead."
)
explicit_patch = {}
else:
logger.warning(
f"Row {row_idx}: Unsupported patch document type: {type(patch_doc)}. "
f"Using empty dictionary instead."
)
explicit_patch = {}
# Merge patches - explicit patch takes precedence
return {**field_patch, **explicit_patch}
def _ensure_dict_patch(
metadata_df: pd.DataFrame,
final_patch_column: str,
row_idx: int,
) -> tuple[dict[str, object], list[ValidationError]]:
"""Ensure a patch value is a dictionary, converting from JSON string if needed.
Args:
metadata_df: DataFrame containing the patch data.
final_patch_column: Name of the column containing the final patch document.
row_idx: The row index to process.
Returns:
tuple[dict[str, object], list[ValidationError]]: A tuple containing:
- The patch as a dictionary (empty dict if invalid or missing)
- List of validation errors (empty if no errors)
"""
patch = metadata_df.loc[row_idx, final_patch_column]
validation_errors: list[ValidationError] = []
# For None/null values, return an empty dict
if patch is None:
return {}, validation_errors
# Handle NaN differently from None
if isinstance(patch, float) and np.isnan(patch):
return {}, validation_errors
# If already a dict, return as is
if isinstance(patch, dict):
return patch, validation_errors
# If string, try to parse as JSON
if isinstance(patch, str):
try:
parsed = json.loads(patch)
if isinstance(parsed, dict):
return parsed, validation_errors
except json.JSONDecodeError as e:
error_msg = f"Invalid JSON in patch document: {e}"
logger.warning(f"Row {row_idx}: {error_msg}")
validation_errors.append(
InvalidPatchDocumentFormat(row_idx, error_msg)
)
return {}, validation_errors # if not validate else None
else:
error_msg = f"JSON must be an object/dictionary, got {type(parsed).__name__}"
logger.warning(f"Row {row_idx}: {error_msg}")
validation_errors.append(
InvalidPatchDocumentFormat(row_idx, error_msg)
)
return {}, validation_errors # if not validate else None
# For other types, log warning
error_msg = f"Unsupported patch type: {type(patch).__name__}"
logger.warning(f"Row {row_idx}: {error_msg}")
validation_errors.append(InvalidPatchDocumentFormat(row_idx, error_msg))
return {}, validation_errors # if not validate else None
def _format_note_for_storage(
note_text: str,
current_time_ms: int,
) -> list[str] | None:
"""Format a note text into a JSON-serialized list for storage.
Args:
note_text: The note text content to format.
current_time_ms: The current timestamp in milliseconds.
Returns:
list[str] | None: A list containing a single JSON string with note metadata
(text, updated_by, updated_at), or None if note_text is NaN/missing.
"""
if pd.isna(note_text):
return None
note_obj = {
"text": str(note_text),
"updated_by": "SDK",
"updated_at": current_time_ms,
}
return [json.dumps(note_obj)]
def _log_flight_update_summary(
project_name: str,
total_spans: int,
request_type: FlightRequestType,
response: FlightPostArrowFileResponse,
) -> None:
"""Log a structured summary of Flight update results including metrics and errors.
Args:
project_name: Name of the project being updated.
total_spans: Total number of spans in the update request.
request_type: The type of Flight request being performed.
response: The Flight response object containing update results and errors.
Notes:
Logs one summary line with aggregated metrics, plus individual error lines
for any failed span updates. Metrics include success rate, spans processed,
and failure counts.
"""
spans_updated = getattr(response, "spans_updated", None)
if spans_updated is None:
# Fallback for older response types
spans_updated = getattr(response, "records_updated", None)
spans_processed = getattr(response, "spans_processed", None)
raw_errors = getattr(response, "errors", None)
errors = (
[
{"span_id": e.span_id, "error_message": e.error_message}
for e in raw_errors
]
if raw_errors
else []
)
# Normalize request_type to a readable string
req_type_str = getattr(request_type, "name", None) or str(request_type)
# Compute metrics safely
success_rate = None
spans_failed = None
if isinstance(spans_processed, (int, float)) and spans_processed:
su = int(spans_updated or 0)
sp = int(spans_processed)
success_rate = round(100.0 * su / sp, 2)
spans_failed = max(sp - su, 0)
metrics = {
"project": project_name,
"request_type": req_type_str,
"total_spans": int(total_spans),
"spans_processed": spans_processed,
"spans_updated": spans_updated,
"spans_failed": spans_failed,
"success_rate": success_rate,
"error_count": len(errors),
}
# One summary log line (great for JSON pipelines, readable in pretty mode)
if spans_processed is None or spans_updated is None:
logger.warning("Flight update response missing counts", extra=metrics)
else:
all_processed = int(spans_processed) == int(total_spans)
msg = "All spans processed" if all_processed else "Partial processing"
logger.info(msg, extra=metrics)
# Emit individual error lines (structured per-error, easy to aggregate)
for err in errors:
logger.error(
"Span update error",
extra={
"project": project_name,
"request_type": req_type_str,
**err,
},
)
def _message_to_dict(
msg: message.Message,
preserve_names: bool = True,
use_int_enums: bool = False,
) -> dict[str, object]:
"""Convert a protobuf Message to a dictionary representation.
Args:
msg: The protobuf Message to convert.
preserve_names: If True, preserve original proto field names. If False, use
lowerCamelCase names. Defaults to True.
use_int_enums: If True, represent enum values as integers. If False, use
enum string names. Defaults to False.
Returns:
dict[str, object]: Dictionary representation of the protobuf message.
"""
return json_format.MessageToDict(
msg,
preserving_proto_field_name=preserve_names,
use_integers_for_enums=use_int_enums,
)