"""Client implementation for managing spans and traces in the Arize platform."""
from __future__ import annotations
import json
import logging
import re
from datetime import datetime, timezone
from functools import partial
from typing import TYPE_CHECKING, Any
import numpy as np
import pandas as pd
import pyarrow as pa
from google.protobuf import json_format, message
from arize._exporter.client import ArizeExportClient
from arize._flight.client import ArizeFlightClient, FlightPostArrowFileResponse
from arize._flight.types import FlightRequestType
from arize.constants.spans import DEFAULT_DATETIME_FMT
from arize.exceptions.base import (
INVALID_ARROW_CONVERSION_MSG,
ValidationError,
ValidationFailure,
)
from arize.exceptions.models import MissingProjectNameError
from arize.exceptions.spaces import MissingSpaceIDError
from arize.logging import CtxAdapter
from arize.ml.types import Environments
from arize.pre_releases import ReleaseStage, prerelease_endpoint
from arize.spans.validation.metadata.value_validation import (
InvalidPatchDocumentFormat,
)
from arize.utils.arrow import post_arrow_table
from arize.utils.dataframe import (
remove_extraneous_columns,
reset_dataframe_index,
)
from arize.utils.proto import get_pb_schema_tracing
from arize.utils.resolve import _find_project_id
if TYPE_CHECKING:
# builtins is needed to use builtins.list in type annotations because
# the class has a list() method that shadows the built-in list type
import builtins
import requests
from arize._generated.api_client.api_client import ApiClient
from arize._generated.protocol.flight import flight_pb2
from arize.config import SDKConfiguration
from arize.spans.types import (
AnnotateRecordInput,
SpanDeletePartialResponse,
SpanListResponse,
)
logger = logging.getLogger(__name__)
[docs]
class SpansClient:
"""Client for logging LLM tracing spans and evaluations to Arize.
This class is primarily intended for internal use within the SDK. Users are
highly encouraged to access resource-specific functionality via
:class:`arize.ArizeClient`.
"""
def __init__(
self, *, sdk_config: SDKConfiguration, generated_client: ApiClient
) -> None:
"""
Args:
sdk_config: Resolved SDK configuration.
generated_client: Shared generated API client instance.
""" # noqa: D205, D212
self._sdk_config = sdk_config
# Import at runtime so it's still lazy and extras-gated by the parent
from arize._generated import api_client as gen
# Use the provided client directly
self._api = gen.SpansApi(generated_client)
self._projects_api = gen.ProjectsApi(generated_client)
[docs]
@prerelease_endpoint(key="spans.delete", stage=ReleaseStage.ALPHA)
def delete(
self,
*,
project: str,
span_ids: builtins.list[str],
space: str | None = None,
) -> SpanDeletePartialResponse | None:
"""Permanently delete spans by their IDs.
This operation is irreversible. Only spans within the supported
lookback window (2 years) are considered; older spans are not affected. If one
or more span IDs are not found, they are silently ignored.
Args:
project: Project name or identifier (base64) containing the spans.
If the value is a name, ``space`` must also be provided.
span_ids: List of span IDs to delete.
space: Optional space name or ID used to disambiguate the project
lookup. Required when ``project`` is a name.
Returns:
``None`` when all spans were deleted (HTTP 204). A response
object with ``deleted_span_ids`` when the server reports a
partial deletion (HTTP 200) — retry the original request for
a complete result.
Raises:
ValueError: If ``span_ids`` is empty.
ApiException: If the REST API returns an error response
(e.g. 401/403/429).
"""
if not span_ids:
raise ValueError("span_ids must not be empty")
project_id = _find_project_id(
api=self._projects_api,
project=project,
space=space,
)
from arize._generated import api_client as gen
body = gen.DeleteSpansRequest(
project_id=project_id,
span_ids=span_ids,
)
return self._api.spans_delete(delete_spans_request=body)
[docs]
@prerelease_endpoint(key="spans.list", stage=ReleaseStage.ALPHA)
def list(
self,
*,
project: str,
space: str | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
filter: str | None = None,
limit: int = 100,
cursor: str | None = None,
) -> SpanListResponse:
"""List spans for a project within a time range.
Spans are returned in descending start-time order (most recent first).
If ``start_time`` and ``end_time`` are not provided, the query covers the
last seven days relative to the time of the request.
Args:
project: Project name or identifier (base64) to list spans for.
If the value is a name, ``space`` must also be provided.
space: Optional space name or ID used to disambiguate the project
lookup. Required when ``project`` is a name.
start_time: Inclusive lower bound of the time window. Defaults to
seven days before the request time.
end_time: Exclusive upper bound of the time window. Defaults to the
request time.
filter: Optional filter expression to narrow results. Supports
equality, comparison, and SQL-style ``AND``/``OR`` operators.
Examples::
"status_code = 'ERROR'"
"eval.Custom_eval_correctness.label = 'correct'"
"annotation.Correctness.label = 'Correct'"
"latency_ms > 1789"
"status_code = 'ERROR' AND eval.Custom_eval_correctness.label = 'correct'"
"status_code = 'ERROR' OR eval.Custom_eval_correctness.label = 'correct'"
limit: Maximum number of spans to return. The server enforces an
upper bound. Defaults to 100.
cursor: Opaque pagination cursor returned from a previous response.
Returns:
A response object with the spans and pagination information.
Raises:
ApiException: If the REST API
returns an error response (e.g. 401/403/429).
"""
project_id = _find_project_id(
api=self._projects_api,
project=project,
space=space,
)
logger.warning(
"The spans.list endpoint is currently in active development and may "
"not be suitable to download large amounts of spans. Use `export_to_df` instead."
)
from arize._generated import api_client as gen
body = gen.ListSpansRequest(
project_id=project_id,
start_time=start_time,
end_time=end_time,
filter=filter,
)
return self._api.spans_list(
list_spans_request=body,
limit=limit,
cursor=cursor,
)
[docs]
@prerelease_endpoint(key="spans.annotate", stage=ReleaseStage.ALPHA)
def annotate_spans(
self,
*,
project: str,
space: str | None = None,
annotations: builtins.list[AnnotateRecordInput],
start_time: datetime | None = None,
end_time: datetime | None = None,
) -> None:
"""Write human annotations to a batch of spans in a project.
Annotations are upserted by annotation config name for each span.
Submitting the same annotation config name for the same span
overwrites the previous value. Retrying on network failure will
not create duplicates.
Up to 1000 spans may be annotated per request. Spans are looked up
within the specified time window (defaulting to the last 31 days).
If any span ID in the batch is not found within the window, the
entire request is rejected with a 404 error.
The write completes synchronously before the function returns. Visibility
in read queries may lag by a short interval (HTTP 202 Accepted).
Args:
project: Project ID or name.
space: Space ID or name. Required when *project* is a name.
annotations: A list of :class:`AnnotateRecordInput` items. Each item
must include a ``record_id`` (the span ID) and ``values``
(a list of :class:`AnnotationInput` items with ``name``, and
optionally ``score``, ``label``, or ``text``).
start_time: Start of the time window used to look up spans.
Defaults to 31 days before the request time.
end_time: End of the time window used to look up spans.
Defaults to the request time.
Raises:
ApiException: If the REST API returns an error response
(e.g. 400/401/403/404/429).
"""
project_id = _find_project_id(
api=self._projects_api,
project=project,
space=space,
)
from arize._generated import api_client as gen
body = gen.AnnotateSpansRequestBody(
project_id=project_id,
annotations=annotations,
start_time=start_time,
end_time=end_time,
)
return self._api.spans_annotate(annotate_spans_request_body=body)
[docs]
def log(
self,
*,
space_id: str,
project_name: str,
dataframe: pd.DataFrame,
evals_dataframe: pd.DataFrame | None = None,
datetime_format: str = DEFAULT_DATETIME_FMT,
validate: bool = True,
timeout: float | None = None,
tmp_dir: str = "",
) -> requests.Response:
"""Logs a pandas dataframe containing LLM tracing data to Arize via a POST request.
Returns a :class:`Response` object from the Requests HTTP library to ensure
successful delivery of records.
Args:
space_id: The space ID where the project resides.
project_name: A unique name to identify your project in the Arize platform.
dataframe (:class:`pandas.DataFrame`): The dataframe containing the LLM traces.
evals_dataframe (:class:`pandas.DataFrame` | :obj:`None`): A dataframe containing
LLM evaluations data. The evaluations are joined to their corresponding spans
via a left outer join, i.e., using only `context.span_id` from the spans
dataframe. Defaults to None.
datetime_format: format for the timestamp captured in the LLM traces.
Defaults to "%Y-%m-%dT%H:%M:%S.%f+00:00".
validate: When set to True, validation is run before sending data.
Defaults to True.
timeout: You can stop waiting for a response after a given number
of seconds with the timeout parameter. Defaults to None.
tmp_dir: Temporary directory/file to store the serialized data in binary
before sending to Arize.
Returns:
Response object from the HTTP request (only returned on HTTP 2xx).
Raises:
MissingSpaceIDError: If space_id is not provided or empty.
MissingProjectNameError: If project_name is not provided or empty.
ValidationFailure: If validate=True and validation checks fail.
pa.ArrowInvalid: If the dataframe cannot be converted to Arrow format.
AuthenticationError: If the server returns HTTP 401 or 403 (invalid API key or
space ID). Raised immediately to prevent further uploads with bad credentials.
APIError: If the server returns any other non-2xx response (e.g. 400, 422, 429,
5xx). Raised immediately to prevent further uploads when the server signals
an error.
"""
from arize.spans.columns import (
EVAL_COLUMN_PATTERN,
ROOT_LEVEL_SPAN_KIND_COL,
SPAN_KIND_COL,
SPAN_OPENINFERENCE_COLUMNS,
SPAN_SPAN_ID_COL,
)
from arize.spans.conversion import (
convert_timestamps,
jsonify_dictionaries,
)
from arize.spans.validation.evals import evals_validation
from arize.spans.validation.spans import spans_validation
# This method requires a space_id and project_name
if not space_id:
raise MissingSpaceIDError()
if not project_name:
raise MissingProjectNameError()
# We need our own copy since we will manipulate the underlying data and
# do not want side effects
spans_df = dataframe.copy()
evals_df = (
evals_dataframe.copy() if evals_dataframe is not None else None
)
# Bind common context for this operation
log = CtxAdapter(
logger,
{
"resource": "spans",
"operation": "log",
"space_id": space_id,
"project": project_name,
"validate": validate,
"spans_df_rows": len(spans_df),
"evals_df_rows": len(evals_df) if evals_df is not None else 0,
},
)
# We expect the index to be 0,1,2,3..., len(df)-1. Phoenix, for example, will give us a dataframe
# with context_id as the index
reset_dataframe_index(dataframe=spans_df)
if evals_df is not None:
reset_dataframe_index(dataframe=evals_df)
log.debug("Performing direct input type validation")
errors = spans_validation.validate_argument_types(
spans_dataframe=spans_df,
project_name=project_name,
dt_fmt=datetime_format,
)
if evals_df is not None:
eval_errors = evals_validation.validate_argument_types(
evals_dataframe=evals_df,
project_name=project_name,
)
errors += eval_errors
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
if validate:
log.debug("Performing dataframe form validation")
errors = spans_validation.validate_dataframe_form(
spans_dataframe=spans_df
)
if evals_df is not None:
eval_errors = evals_validation.validate_dataframe_form(
evals_dataframe=evals_df
)
errors += eval_errors
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
log.debug("Removing unnecessary columns")
spans_df = remove_extraneous_columns(
df=spans_df,
column_list=[col.name for col in SPAN_OPENINFERENCE_COLUMNS],
regex=r"^attributes\.",
)
if evals_df is not None:
evals_df = remove_extraneous_columns(
df=evals_df,
column_list=[SPAN_SPAN_ID_COL.name],
regex=EVAL_COLUMN_PATTERN,
)
log.debug("Converting timestamps")
spans_df = convert_timestamps(df=spans_df, fmt=datetime_format)
if validate:
log.debug("Performing values validation")
errors = spans_validation.validate_values(
spans_dataframe=spans_df,
project_name=project_name,
)
if evals_df is not None:
eval_errors = evals_validation.validate_values(
evals_dataframe=evals_df,
project_name=project_name,
)
errors += eval_errors
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
log.debug("Converting dictionaries to JSON objects")
spans_df = jsonify_dictionaries(spans_df)
if (
ROOT_LEVEL_SPAN_KIND_COL.name in spans_df.columns
and SPAN_KIND_COL.name not in spans_df.columns
):
log.debug("Moving span kind to atributes")
spans_df.rename(
columns={ROOT_LEVEL_SPAN_KIND_COL.name: SPAN_KIND_COL.name},
inplace=True,
)
df = (
pd.merge(spans_df, evals_df, on=SPAN_SPAN_ID_COL.name, how="left")
if evals_df is not None
else spans_df
)
# Convert to Arrow table
try:
log.debug("Converting data to Arrow format")
pa_table = pa.Table.from_pandas(df, preserve_index=False)
except pa.ArrowInvalid as e:
log.exception(INVALID_ARROW_CONVERSION_MSG)
raise pa.ArrowInvalid(
f"Error converting to Arrow format: {e!s}"
) from e
except Exception:
log.exception("Unexpected error creating Arrow table")
raise
proto_schema = get_pb_schema_tracing(project_name=project_name)
# Create headers copy for the spans client
# Safe to mutate, returns a deep copy
headers = self._sdk_config.headers
# Send the number of rows in the dataframe as a header
# This helps the Arize server to return appropriate feedback, specially for async logging
headers.update(
{
"arize-space-id": space_id,
"arize-interface": "batch",
"number-of-rows": str(len(spans_df)),
}
)
return post_arrow_table(
files_url=self._sdk_config.files_url,
pa_table=pa_table,
proto_schema=proto_schema,
headers=headers,
timeout=timeout,
verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
tmp_dir=tmp_dir,
)
[docs]
def update_evaluations(
self,
*,
space_id: str,
project_name: str,
dataframe: pd.DataFrame,
validate: bool = True,
force_http: bool = False,
timeout: float | None = None,
tmp_dir: str = "",
) -> flight_pb2.WriteSpanEvaluationResponse:
"""Logs a pandas dataframe containing LLM evaluations data to Arize via a Flight gRPC request.
The dataframe must contain a column `context.span_id` such that Arize can assign
each evaluation to its respective span.
Args:
space_id: The space ID where the project resides.
project_name: A unique name to identify your project in the Arize platform.
dataframe (:class:`pandas.DataFrame`): A dataframe containing LLM evaluations data.
validate: When set to True, validation is run before sending data.
Defaults to True.
force_http: Force the use of HTTP for data upload. Defaults to False.
timeout: You can stop waiting for a response after a given number
of seconds with the timeout parameter. Defaults to None.
tmp_dir: Temporary directory/file to store the serialized data in binary
before sending to Arize.
Raises:
MissingSpaceIDError: If space_id is not provided or empty.
MissingProjectNameError: If project_name is not provided or empty.
ValidationFailure: If validate=True and validation checks fail.
pa.ArrowInvalid: If the dataframe cannot be converted to Arrow format.
AuthenticationError: If the server returns HTTP 401 or 403.
Raised immediately to prevent further uploads with bad credentials.
APIError: If the server returns any other non-2xx response.
Raised immediately to prevent further uploads when the server signals an error.
"""
from arize.spans.columns import EVAL_COLUMN_PATTERN, SPAN_SPAN_ID_COL
from arize.spans.validation.evals import evals_validation
# This method requires a space_id and project_name
if not space_id:
raise MissingSpaceIDError()
if not project_name:
raise MissingProjectNameError()
# Bind common context for this operation
log = CtxAdapter(
logger,
{
"resource": "spans",
"operation": "log",
"space_id": space_id,
"project": project_name,
"validate": validate,
"evals_df_rows": len(dataframe),
},
)
# We need our own copy since we will manipulate the underlying data and
# do not want side effects
evals_df = dataframe.copy()
# We expect the index to be 0,1,2,3..., len(df)-1. Phoenix, for example, will give us a dataframe
# with context_id as the index; the old index is not meaningful in our copy of the original dataframe
# so we can drop it.
reset_dataframe_index(dataframe=evals_df)
log.debug("Performing direct input type validation")
errors = evals_validation.validate_argument_types(
evals_dataframe=evals_df,
project_name=project_name,
)
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
if validate:
log.debug("Performing dataframe form validation")
errors = evals_validation.validate_dataframe_form(
evals_dataframe=evals_df
)
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
log.debug("Removing unnecessary columns")
evals_df = remove_extraneous_columns(
df=evals_df,
column_list=[SPAN_SPAN_ID_COL.name],
regex=EVAL_COLUMN_PATTERN,
)
if validate:
log.debug("Performing values validation")
errors = evals_validation.validate_values(
evals_dataframe=evals_df,
project_name=project_name,
)
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
# Convert to Arrow table
try:
log.debug("Converting data to Arrow format")
pa_table = pa.Table.from_pandas(evals_df, preserve_index=False)
except pa.ArrowInvalid as e:
log.exception(INVALID_ARROW_CONVERSION_MSG)
raise pa.ArrowInvalid(
f"Error converting to Arrow format: {e!s}"
) from e
except Exception:
log.exception("Unexpected error creating Arrow table")
raise
if force_http:
proto_schema = get_pb_schema_tracing(project_name=project_name)
# Create headers copy for the spans client
# Safe to mutate, returns a deep copy
headers = self._sdk_config.headers
# Send the number of rows in the dataframe as a header
# This helps the Arize server to return appropriate feedback, specially for async logging
headers.update(
{
"arize-space-id": space_id,
"arize-interface": "batch",
"number-of-rows": str(len(dataframe)),
}
)
return post_arrow_table(
files_url=self._sdk_config.files_url,
pa_table=pa_table,
proto_schema=proto_schema,
headers=headers,
timeout=timeout,
verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
tmp_dir=tmp_dir,
)
request_type = FlightRequestType.EVALUATION
response = None
with ArizeFlightClient(
api_key=self._sdk_config.api_key,
host=self._sdk_config.flight_host,
port=self._sdk_config.flight_port,
scheme=self._sdk_config.flight_scheme,
request_verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
) as flight_client:
try:
response = flight_client.log_arrow_table(
space_id=space_id,
project_name=project_name,
pa_table=pa_table,
request_type=request_type,
)
except Exception as e:
msg = f"Error during update request: {e!s}"
log.exception(msg)
raise RuntimeError(msg) from e
if response is None:
# This should not happen with proper Flight client implementation,
# but we handle it defensively
msg = "No response received from flight server during update"
log.error(msg)
raise RuntimeError(msg)
_log_flight_update_summary(
project_name=project_name,
total_spans=len(pa_table),
request_type=request_type,
response=response,
)
# Convert Protocol Buffer SpanError objects to dictionaries for easier access
return _message_to_dict(response)
[docs]
def update_annotations(
self,
*,
space_id: str,
project_name: str,
dataframe: pd.DataFrame,
validate: bool = True,
) -> flight_pb2.WriteSpanAnnotationResponse:
"""Logs a pandas dataframe containing LLM span annotations to Arize via a Flight gRPC request.
The dataframe must contain a column `context.span_id` such that Arize can assign
each annotation to its respective span. Annotation columns should follow the pattern
`annotation.<name>.<suffix>` where suffix is `label`, `score`, or `text`. An optional
`annotation.notes` column can be included for free-form text notes.
Args:
space_id: The space ID where the project resides.
project_name: A unique name to identify your project in the Arize platform.
dataframe (:class:`pandas.DataFrame`): A dataframe containing LLM annotation data.
validate: When set to True, validation is run before sending data.
Defaults to True.
"""
from arize.spans.columns import (
ANNOTATION_COLUMN_PATTERN,
ANNOTATION_LABEL_SUFFIX,
ANNOTATION_NOTES_COLUMN_NAME,
ANNOTATION_SCORE_SUFFIX,
ANNOTATION_TEXT_SUFFIX,
ANNOTATION_UPDATED_AT_SUFFIX,
ANNOTATION_UPDATED_BY_SUFFIX,
SPAN_SPAN_ID_COL,
)
from arize.spans.validation.annotations import annotations_validation
# This method requires a space_id and project_name
if not space_id:
raise MissingSpaceIDError()
if not project_name:
raise MissingProjectNameError()
# Bind common context for this operation
log = CtxAdapter(
logger,
{
"resource": "spans",
"operation": "log",
"space_id": space_id,
"project": project_name,
"validate": validate,
"evals_df_rows": len(dataframe),
},
)
anno_df = dataframe.copy()
# We expect the index to be 0,1,2,3..., len(df)-1. Phoenix, for example, will give us a dataframe
# with context_id as the index; the old index is not meaningful in our copy of the original dataframe
# so we can drop it.
reset_dataframe_index(dataframe=anno_df)
log.debug(
"Checking for and autogenerating missing updated_by/updated_at annotation columns"
)
annotation_cols = [
col
for col in anno_df.columns
if re.match(ANNOTATION_COLUMN_PATTERN, col)
]
annotation_names = set()
# Extract unique annotation names (e.g., "quality" from "annotation.quality.label")
for col in annotation_cols:
match = re.match(r"^annotation\.([a-zA-Z0-9_\s]+?)(\..+)$", col)
if match:
annotation_names.add(match.group(1))
log.debug(f"Found annotation names: {annotation_names}")
current_time_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
for name in annotation_names:
updated_by_col = f"annotation.{name}{ANNOTATION_UPDATED_BY_SUFFIX}"
updated_at_col = f"annotation.{name}{ANNOTATION_UPDATED_AT_SUFFIX}"
label_col = f"annotation.{name}{ANNOTATION_LABEL_SUFFIX}"
score_col = f"annotation.{name}{ANNOTATION_SCORE_SUFFIX}"
text_col = f"annotation.{name}{ANNOTATION_TEXT_SUFFIX}"
# Check if *any* part of this annotation exists (label, score, or text)
# Only add metadata if the annotation itself is present
if (
label_col in anno_df.columns
or score_col in anno_df.columns
or text_col in anno_df.columns
):
if updated_by_col not in anno_df.columns:
log.debug(f"Autogenerating column: {updated_by_col}")
anno_df[updated_by_col] = "SDK"
if updated_at_col not in anno_df.columns:
log.debug(f"Autogenerating column: {updated_at_col}")
anno_df[updated_at_col] = current_time_ms
else:
log.debug(
f"Skipping metadata generation for '{name}' as no label, score, or text column found."
)
if ANNOTATION_NOTES_COLUMN_NAME in anno_df.columns:
log.debug(
f"Formatting {ANNOTATION_NOTES_COLUMN_NAME} column to JSON strings within lists."
)
anno_df[ANNOTATION_NOTES_COLUMN_NAME] = anno_df[
ANNOTATION_NOTES_COLUMN_NAME
].apply(
partial(
_format_note_for_storage,
current_time_ms=current_time_ms,
)
)
log.debug("Performing direct input type validation for annotations")
errors = annotations_validation.validate_argument_types(
annotations_dataframe=anno_df,
project_name=project_name,
)
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
if validate:
log.debug("Performing dataframe form validation for annotations")
errors = annotations_validation.validate_dataframe_form(
annotations_dataframe=anno_df
)
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
log.debug("Removing unnecessary annotation columns")
# Update columns to keep: span_id, annotation.notes, and annotation pattern
columns_to_keep = [SPAN_SPAN_ID_COL.name]
if ANNOTATION_NOTES_COLUMN_NAME in anno_df.columns:
columns_to_keep.append(ANNOTATION_NOTES_COLUMN_NAME)
anno_df = remove_extraneous_columns(
df=anno_df,
column_list=columns_to_keep,
regex=ANNOTATION_COLUMN_PATTERN,
)
if validate:
log.debug("Performing annotation values validation")
errors = annotations_validation.validate_values(
annotations_dataframe=anno_df,
project_name=project_name,
)
for e in errors:
log.error(e)
if errors:
raise ValidationFailure(errors)
# Convert to Arrow table
try:
log.debug("Converting data to Arrow format")
pa_table = pa.Table.from_pandas(anno_df, preserve_index=False)
except pa.ArrowInvalid as e:
log.exception(INVALID_ARROW_CONVERSION_MSG)
raise pa.ArrowInvalid(
f"Error converting to Arrow format: {e!s}"
) from e
except Exception:
log.exception("Unexpected error creating Arrow table")
raise
if ANNOTATION_NOTES_COLUMN_NAME in anno_df.columns:
notes_field = pa_table.schema.field(ANNOTATION_NOTES_COLUMN_NAME)
if not (
isinstance(notes_field.type, pa.ListType)
and notes_field.type.value_type == pa.string()
):
log.warning(
f"Warning: Inferred type for {ANNOTATION_NOTES_COLUMN_NAME} is "
f"{notes_field.type}, expected list<string>."
)
request_type = FlightRequestType.ANNOTATION
response = None
with ArizeFlightClient(
api_key=self._sdk_config.api_key,
host=self._sdk_config.flight_host,
port=self._sdk_config.flight_port,
scheme=self._sdk_config.flight_scheme,
request_verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
) as flight_client:
try:
response = flight_client.log_arrow_table(
space_id=space_id,
project_name=project_name,
pa_table=pa_table,
request_type=request_type,
)
except Exception as e:
msg = f"Error during update request: {e!s}"
log.exception(msg)
raise RuntimeError(msg) from e
if response is None:
# This should not happen with proper Flight client implementation,
# but we handle it defensively
msg = "No response received from flight server during update"
log.error(msg)
raise RuntimeError(msg)
_log_flight_update_summary(
project_name=project_name,
total_spans=len(pa_table),
request_type=request_type,
response=response,
)
# Convert Protocol Buffer SpanError objects to dictionaries for easier access
return _message_to_dict(response)
[docs]
def export_to_df(
self,
*,
space_id: str,
project_name: str,
start_time: datetime,
end_time: datetime,
where: str = "",
columns: builtins.list | None = None,
stream_chunk_size: int | None = None,
) -> pd.DataFrame:
"""Export span data from Arize to a :class:`pandas.DataFrame`.
Retrieves trace/span data from the specified project within a time range
and returns it as a :class:`pandas.DataFrame`. Supports filtering with SQL-like
WHERE clauses and similarity search for semantic retrieval.
Returns:
:class:`pandas.DataFrame`: DataFrame containing the requested span data with columns
for span metadata, attributes, events, and any custom fields.
"""
with ArizeFlightClient(
api_key=self._sdk_config.api_key,
host=self._sdk_config.flight_host,
port=self._sdk_config.flight_port,
scheme=self._sdk_config.flight_scheme,
request_verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
) as flight_client:
exporter = ArizeExportClient(
flight_client=flight_client,
)
return exporter.export_to_df(
space_id=space_id,
model_id=project_name,
environment=Environments.TRACING,
start_time=start_time,
end_time=end_time,
where=where,
columns=columns,
stream_chunk_size=stream_chunk_size,
)
[docs]
def export_to_parquet(
self,
*,
path: str,
space_id: str,
project_name: str,
start_time: datetime,
end_time: datetime,
where: str = "",
columns: builtins.list | None = None,
stream_chunk_size: int | None = None,
) -> None:
"""Export span data from Arize to a Parquet file.
Retrieves trace/span data from the specified project within a time range
and writes it directly to a Parquet file at the specified path. Supports
filtering with SQL-like WHERE clauses for efficient querying. Ideal for
large datasets and long-term storage.
Args:
path: The file path where the Parquet file will be written.
space_id: The space ID where the project resides.
project_name: The name of the project to export span data from.
start_time: Start of the time range (inclusive) as a datetime object.
end_time: End of the time range (inclusive) as a datetime object.
where: Optional SQL-like WHERE clause to filter rows (e.g., "span.status_code = 'ERROR'").
columns: Optional list of column names to include. If None, all columns are returned.
stream_chunk_size: Optional chunk size for streaming large result sets.
Raises:
RuntimeError: If the Flight client request fails or returns no response.
Notes:
- Uses Apache Arrow Flight for efficient data transfer
- Data is written directly to the specified path as a Parquet file
- Large exports may benefit from specifying stream_chunk_size
"""
with ArizeFlightClient(
api_key=self._sdk_config.api_key,
host=self._sdk_config.flight_host,
port=self._sdk_config.flight_port,
scheme=self._sdk_config.flight_scheme,
request_verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
) as flight_client:
exporter = ArizeExportClient(
flight_client=flight_client,
)
exporter.export_to_parquet(
path=path,
space_id=space_id,
model_id=project_name,
environment=Environments.TRACING,
start_time=start_time,
end_time=end_time,
where=where,
columns=columns,
stream_chunk_size=stream_chunk_size,
)
def _build_patch_document(row: pd.Series) -> dict[str, object]:
"""Build a patch document from a pandas Series row by extracting metadata fields.
Args:
row: A pandas Series representing a row of data with potential metadata columns.
Returns:
dict[str, object]: A dictionary mapping metadata field names (without the
'attributes.metadata.' prefix) to their values, preserving arrays and scalars.
"""
# Extract and preserve metadata values with proper types
patch = {}
for key in row.index:
if key.startswith("attributes.metadata."):
field_name = key.replace("attributes.metadata.", "")
# Check if the value is an array/list or other iterable (except strings)
if isinstance(row[key], (list, np.ndarray)) or (
hasattr(row[key], "__iter__") and not isinstance(row[key], str)
):
# For arrays/iterables, just add the value (nulls will be handled later)
patch[field_name] = row[key]
else:
# For scalar values, include even if it's None/null
# This is important for explicitly setting fields to null
patch[field_name] = row[key]
return patch
def _process_patch_document(
metadata_df: pd.DataFrame,
patch_document_column_name: str,
field_patches: pd.Series[Any],
row_idx: int,
) -> dict[str, object]:
"""Process and merge patch documents from field patches and explicit patch column.
Args:
metadata_df: DataFrame containing the metadata with patch documents.
patch_document_column_name: Name of the column containing explicit patch documents.
field_patches: DataFrame containing patches derived from individual metadata fields.
row_idx: The row index to process.
Returns:
dict[str, object]: Merged patch document where explicit patches take precedence over
field patches. Returns empty dict if patch document is invalid or missing.
"""
# Get the field patch for this row
field_patch = field_patches.iloc[row_idx]
# Get and process the explicit patch document
patch_doc = metadata_df.loc[row_idx, patch_document_column_name]
# Handle different patch document formats
if patch_doc is None:
# None (as opposed to NaN) is a valid value but creates an empty patch
explicit_patch = {}
elif isinstance(patch_doc, float) and np.isnan(patch_doc):
# NaN is treated as an empty patch
explicit_patch = {}
elif isinstance(patch_doc, dict):
# Dict is used directly
explicit_patch = patch_doc
elif isinstance(patch_doc, str):
try:
explicit_patch = json.loads(patch_doc)
if not isinstance(explicit_patch, dict):
logger.warning(
f"Row {row_idx}: Parsed patch document is not a dictionary. "
f"Using empty dictionary instead."
)
explicit_patch = {}
except json.JSONDecodeError as e:
logger.warning(
f"Row {row_idx}: Failed to parse patch document: {e}. "
f"Using empty dictionary instead."
)
explicit_patch = {}
else:
logger.warning(
f"Row {row_idx}: Unsupported patch document type: {type(patch_doc)}. "
f"Using empty dictionary instead."
)
explicit_patch = {}
# Merge patches - explicit patch takes precedence
return {**field_patch, **explicit_patch}
def _ensure_dict_patch(
metadata_df: pd.DataFrame,
final_patch_column: str,
row_idx: int,
) -> tuple[dict[str, object], list[ValidationError]]:
"""Ensure a patch value is a dictionary, converting from JSON string if needed.
Args:
metadata_df: DataFrame containing the patch data.
final_patch_column: Name of the column containing the final patch document.
row_idx: The row index to process.
Returns:
tuple[dict[str, object], list[ValidationError]]: A tuple containing:
- The patch as a dictionary (empty dict if invalid or missing)
- List of validation errors (empty if no errors)
"""
patch = metadata_df.loc[row_idx, final_patch_column]
validation_errors: list[ValidationError] = []
# For None/null values, return an empty dict
if patch is None:
return {}, validation_errors
# Handle NaN differently from None
if isinstance(patch, float) and np.isnan(patch):
return {}, validation_errors
# If already a dict, return as is
if isinstance(patch, dict):
return patch, validation_errors
# If string, try to parse as JSON
if isinstance(patch, str):
try:
parsed = json.loads(patch)
if isinstance(parsed, dict):
return parsed, validation_errors
except json.JSONDecodeError as e:
error_msg = f"Invalid JSON in patch document: {e}"
logger.warning(f"Row {row_idx}: {error_msg}")
validation_errors.append(
InvalidPatchDocumentFormat(row_idx, error_msg)
)
return {}, validation_errors # if not validate else None
else:
error_msg = f"JSON must be an object/dictionary, got {type(parsed).__name__}"
logger.warning(f"Row {row_idx}: {error_msg}")
validation_errors.append(
InvalidPatchDocumentFormat(row_idx, error_msg)
)
return {}, validation_errors # if not validate else None
# For other types, log warning
error_msg = f"Unsupported patch type: {type(patch).__name__}"
logger.warning(f"Row {row_idx}: {error_msg}")
validation_errors.append(InvalidPatchDocumentFormat(row_idx, error_msg))
return {}, validation_errors # if not validate else None
def _format_note_for_storage(
note_text: str,
current_time_ms: int,
) -> list[str] | None:
"""Format a note text into a JSON-serialized list for storage.
Args:
note_text: The note text content to format.
current_time_ms: The current timestamp in milliseconds.
Returns:
list[str] | None: A list containing a single JSON string with note metadata
(text, updated_by, updated_at), or None if note_text is NaN/missing.
"""
if pd.isna(note_text):
return None
note_obj = {
"text": str(note_text),
"updated_by": "SDK",
"updated_at": current_time_ms,
}
return [json.dumps(note_obj)]
def _log_flight_update_summary(
project_name: str,
total_spans: int,
request_type: FlightRequestType,
response: FlightPostArrowFileResponse,
) -> None:
"""Log a structured summary of Flight update results including metrics and errors.
Args:
project_name: Name of the project being updated.
total_spans: Total number of spans in the update request.
request_type: The type of Flight request being performed.
response: The Flight response object containing update results and errors.
Notes:
Logs one summary line with aggregated metrics, plus individual error lines
for any failed span updates. Metrics include success rate, spans processed,
and failure counts.
"""
spans_updated = getattr(response, "spans_updated", None)
if spans_updated is None:
# Fallback for older response types
spans_updated = getattr(response, "records_updated", None)
spans_processed = getattr(response, "spans_processed", None)
raw_errors = getattr(response, "errors", None)
errors = (
[
{"span_id": e.span_id, "error_message": e.error_message}
for e in raw_errors
]
if raw_errors
else []
)
# Normalize request_type to a readable string
req_type_str = getattr(request_type, "name", None) or str(request_type)
# Compute metrics safely
success_rate = None
spans_failed = None
if isinstance(spans_processed, (int, float)) and spans_processed:
su = int(spans_updated or 0)
sp = int(spans_processed)
success_rate = round(100.0 * su / sp, 2)
spans_failed = max(sp - su, 0)
metrics = {
"project": project_name,
"request_type": req_type_str,
"total_spans": int(total_spans),
"spans_processed": spans_processed,
"spans_updated": spans_updated,
"spans_failed": spans_failed,
"success_rate": success_rate,
"error_count": len(errors),
}
# One summary log line (great for JSON pipelines, readable in pretty mode)
if spans_processed is None or spans_updated is None:
logger.warning("Flight update response missing counts", extra=metrics)
else:
all_processed = int(spans_processed) == int(total_spans)
msg = "All spans processed" if all_processed else "Partial processing"
logger.info(msg, extra=metrics)
# Emit individual error lines (structured per-error, easy to aggregate)
for err in errors:
logger.error(
"Span update error",
extra={
"project": project_name,
"request_type": req_type_str,
**err,
},
)
def _message_to_dict(
msg: message.Message,
preserve_names: bool = True,
use_int_enums: bool = False,
) -> dict[str, object]:
"""Convert a protobuf Message to a dictionary representation.
Args:
msg: The protobuf Message to convert.
preserve_names: If True, preserve original proto field names. If False, use
lowerCamelCase names. Defaults to True.
use_int_enums: If True, represent enum values as integers. If False, use
enum string names. Defaults to False.
Returns:
dict[str, object]: Dictionary representation of the protobuf message.
"""
return json_format.MessageToDict(
msg,
preserving_proto_field_name=preserve_names,
use_integers_for_enums=use_int_enums,
)