"""Client implementation for managing experiments in the Arize platform."""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, cast
import opentelemetry.sdk.trace as trace_sdk
import pandas as pd
import pyarrow as pa
from openinference.semconv.resource import ResourceAttributes
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter as GrpcSpanExporter,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace.export import (
ConsoleSpanExporter,
SimpleSpanProcessor,
)
from arize._flight.client import ArizeFlightClient
from arize._flight.types import FlightRequestType
from arize._generated.api_client import models
from arize.exceptions.base import INVALID_ARROW_CONVERSION_MSG
from arize.experiments.functions import (
run_experiment,
transform_to_experiment_format,
)
from arize.pre_releases import ReleaseStage, prerelease_endpoint
from arize.utils.cache import cache_resource, load_cached_resource
from arize.utils.openinference_conversion import (
convert_boolean_columns_to_str,
convert_default_columns_to_json_str,
)
from arize.utils.resolve import (
_find_dataset_id,
_find_experiment_id,
)
from arize.utils.size import get_payload_size_mb
if TYPE_CHECKING:
# builtins is needed to use builtins.list in type annotations because
# the class has a list() method that shadows the built-in list type
import builtins
from opentelemetry.trace import Tracer
from arize._generated.api_client.api_client import ApiClient
from arize.config import SDKConfiguration
from arize.experiments.evaluators.base import Evaluators
from arize.experiments.evaluators.types import EvaluationResultFieldNames
from arize.experiments.types import (
AnnotationBatchResult,
Experiment,
ExperimentsList200Response,
ExperimentsRunsList200Response,
ExperimentTask,
ExperimentTaskFieldNames,
)
logger = logging.getLogger(__name__)
[docs]
class ExperimentsClient:
"""Client for managing experiments including creation, execution, and result tracking.
This class is primarily intended for internal use within the SDK. Users are
highly encouraged to access resource-specific functionality via
:class:`arize.ArizeClient`.
The experiments client is a thin wrapper around the generated REST API client,
using the shared generated API client owned by
:class:`arize.config.SDKConfiguration`.
"""
def __init__(
self, *, sdk_config: SDKConfiguration, generated_client: ApiClient
) -> None:
"""
Args:
sdk_config: Resolved SDK configuration.
generated_client: Shared generated API client instance.
""" # noqa: D205, D212
self._sdk_config = sdk_config
from arize._generated import api_client as gen
# Use the provided client directly for both APIs
self._api = gen.ExperimentsApi(generated_client)
# TODO(Kiko): Space ID should not be needed,
# should work on server tech debt to remove this
self._datasets_api = gen.DatasetsApi(generated_client)
[docs]
@prerelease_endpoint(key="experiments.list", stage=ReleaseStage.BETA)
def list(
self,
*,
dataset: str | None = None,
space: str | None = None,
limit: int = 100,
cursor: str | None = None,
) -> ExperimentsList200Response:
"""List experiments the user has access to.
To filter experiments by the dataset they were run on, provide `dataset`.
Args:
dataset: Optional dataset name or ID to filter experiments.
space: Optional space name or ID used to resolve ``dataset`` by name.
limit: Maximum number of experiments to return. The server enforces an
upper bound.
cursor: Opaque pagination cursor returned from a previous response.
Returns:
A response object with the experiments and pagination information.
Raises:
ApiException: If the REST API
returns an error response (e.g. 401/403/429).
"""
dataset_id = (
_find_dataset_id(
api=self._datasets_api,
dataset=dataset,
space=space,
)
if dataset
else None
)
return self._api.experiments_list(
dataset_id=dataset_id,
limit=limit,
cursor=cursor,
)
[docs]
@prerelease_endpoint(key="experiments.create", stage=ReleaseStage.BETA)
def create(
self,
*,
name: str,
dataset: str,
space: str | None = None,
experiment_runs: builtins.list[dict[str, object]] | pd.DataFrame,
task_fields: ExperimentTaskFieldNames,
evaluator_columns: dict[str, EvaluationResultFieldNames] | None = None,
force_http: bool = False,
) -> Experiment:
"""Create an experiment with one or more experiment runs.
Experiments are composed of runs. Each run must include:
- `example_id`: ID of an existing example in the dataset/version
- `output`: Model/task output for the matching example
You may include any additional user-defined fields per run (e.g. `model`,
`latency_ms`, `temperature`, `prompt`, `tool_calls`, etc.) that can be used
for analysis or filtering.
This method transforms the input runs into the server's expected experiment
format using `task_fields` and optional `evaluator_columns`.
Transport selection:
- If the payload is below the configured REST payload threshold (or
`force_http=True`), this method uploads via REST.
- Otherwise, it attempts a more efficient upload path via gRPC + Flight.
Args:
name: Experiment name. Must be unique within the target dataset.
dataset: Dataset name or ID to attach the experiment to.
space: Optional space name or ID used to resolve ``dataset`` by name.
experiment_runs: Experiment runs either as:
- a list of JSON-like dicts, or
- a :class:`pandas.DataFrame`.
task_fields: Mapping that identifies the columns/fields containing the
task results (e.g. `example_id`, output fields).
evaluator_columns: Optional mapping describing evaluator result columns.
force_http: If True, force REST upload even if the payload exceeds the
configured REST payload threshold.
Returns:
The created experiment object.
Raises:
TypeError: If `experiment_runs` is not a list of dicts or a DataFrame.
RuntimeError: If the Flight upload path is selected and the Flight request
fails.
ApiException: If the REST API
returns an error response (e.g. 400/401/403/409/429).
"""
dataset_id = _find_dataset_id(
api=self._datasets_api,
dataset=dataset,
space=space,
)
if not isinstance(experiment_runs, list | pd.DataFrame):
raise TypeError(
"Experiment runs must be a list of dicts or a pandas DataFrame"
)
# transform experiment data to experiment format
experiment_df = transform_to_experiment_format(
experiment_runs, task_fields, evaluator_columns
)
below_threshold = (
get_payload_size_mb(experiment_runs)
<= self._sdk_config.max_http_payload_size_mb
)
if below_threshold or force_http:
from arize._generated import api_client as gen
data = experiment_df.to_dict(orient="records")
runs_create = [
obj
for run in data
if (
obj := gen.ExperimentRunCreate.from_dict(
cast("dict[str, Any]", run)
)
)
is not None
]
body = gen.ExperimentsCreateRequest(
name=name,
dataset_id=dataset_id,
experiment_runs=runs_create,
)
return self._api.experiments_create(experiments_create_request=body)
# If we have too many examples, try to convert to a dataframe
# and log via gRPC + flight
logger.info(
f"Uploading {len(experiment_df)} experiment runs via REST may be slow. "
"Trying for more efficient upload via gRPC + Flight."
)
# TODO(Kiko): Space ID should not be needed,
# should work on server tech debt to remove this
dataset_obj = self._datasets_api.datasets_get(dataset_id=dataset_id)
space_id = dataset_obj.space_id
self._init_experiment_via_flight(
space_id=space_id,
dataset_id=dataset_id,
experiment_name=name,
)
return self._post_experiment_runs_via_flight(
name=name,
dataset_id=dataset_id,
space_id=space_id,
experiment_df=experiment_df,
)
[docs]
@prerelease_endpoint(key="experiments.get", stage=ReleaseStage.BETA)
def get(
self,
*,
experiment: str,
dataset: str | None = None,
space: str | None = None,
) -> Experiment:
"""Get an experiment by name or ID.
The response does not include the experiment's runs. Use `list_runs()` to
retrieve runs for an experiment.
Args:
experiment: Experiment name or ID to retrieve.
dataset: Optional dataset name or ID used to resolve ``experiment`` by name.
space: Optional space name or ID used to resolve ``dataset`` by name.
Returns:
The experiment object.
Raises:
ApiException: If the REST API
returns an error response (e.g. 401/403/404/429).
"""
experiment_id = _find_experiment_id(
api=self._api,
datasets_api=self._datasets_api,
experiment=experiment,
dataset=dataset,
space=space,
)
return self._api.experiments_get(experiment_id=experiment_id)
[docs]
@prerelease_endpoint(key="experiments.delete", stage=ReleaseStage.BETA)
def delete(
self,
*,
experiment: str,
dataset: str | None = None,
space: str | None = None,
) -> None:
"""Delete an experiment by name or ID.
This operation is irreversible.
Args:
experiment: Experiment name or ID to delete.
dataset: Optional dataset name or ID used to resolve ``experiment`` by name.
space: Optional space name or ID used to resolve ``dataset`` by name.
Returns:
This method returns None on success (common empty 204 response).
Raises:
ApiException: If the REST API
returns an error response (e.g. 401/403/404/429).
"""
experiment_id = _find_experiment_id(
api=self._api,
datasets_api=self._datasets_api,
experiment=experiment,
dataset=dataset,
space=space,
)
return self._api.experiments_delete(
experiment_id=experiment_id,
)
[docs]
@prerelease_endpoint(key="experiments.list_runs", stage=ReleaseStage.BETA)
def list_runs(
self,
*,
experiment: str,
dataset: str | None = None,
space: str | None = None,
limit: int = 100,
all: bool = False,
) -> ExperimentsRunsList200Response:
"""List runs for an experiment.
Runs are returned in insertion order.
Pagination notes:
- The response includes `pagination` for forward compatibility.
- Cursor pagination may not be fully implemented by the server yet.
- If `all=True`, this method retrieves all runs via the Flight path and
returns them in a single response with `has_more=False`.
Args:
experiment: Experiment name or ID to list runs for.
dataset: Optional dataset name or ID used to resolve ``experiment`` by name.
space: Optional space name or ID used to resolve ``dataset`` by name.
limit: Maximum number of runs to return when `all=False`. The server
enforces an upper bound.
all: If True, fetch all runs (ignores `limit`) via Flight and return a
single response.
Returns:
A response object containing `experiment_runs` and `pagination` metadata.
Raises:
RuntimeError: If the Flight request fails or returns no response when
`all=True`.
ApiException: If the REST API
returns an error response when `all=False` (e.g. 401/403/404/429).
"""
experiment_id = _find_experiment_id(
api=self._api,
datasets_api=self._datasets_api,
experiment=experiment,
dataset=dataset,
space=space,
)
if not all:
return self._api.experiments_runs_list(
experiment_id=experiment_id,
limit=limit,
)
experiment_obj = self.get(experiment=experiment_id)
experiment_updated_at = getattr(experiment_obj, "updated_at", None)
# TODO(Kiko): Space ID should not be needed,
# should work on server tech debt to remove this
dataset_obj = self._datasets_api.datasets_get(
dataset_id=experiment_obj.dataset_id
)
space_id = dataset_obj.space_id
experiment_df = None
# try to load dataset from cache
if self._sdk_config.enable_caching:
experiment_df = load_cached_resource(
cache_dir=self._sdk_config.cache_dir,
resource="experiment",
resource_id=experiment_id,
resource_updated_at=experiment_updated_at,
)
if experiment_df is not None:
experiment_runs = [
obj
for run in experiment_df.to_dict(orient="records")
if (
obj := models.ExperimentRun.from_dict(
cast("dict[str, Any]", run)
)
)
is not None
]
return models.ExperimentsRunsList200Response(
experiment_runs=experiment_runs,
pagination=models.PaginationMetadata(
has_more=False, # Note that all=True
),
)
with ArizeFlightClient(
api_key=self._sdk_config.api_key,
host=self._sdk_config.flight_host,
port=self._sdk_config.flight_port,
scheme=self._sdk_config.flight_scheme,
request_verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
) as flight_client:
try:
experiment_df = flight_client.get_experiment_runs(
space_id=space_id,
experiment_id=experiment_id,
)
except Exception as e:
msg = f"Error during request: {e!s}"
logger.exception(msg)
raise RuntimeError(msg) from e
if experiment_df is None:
# This should not happen with proper Flight client implementation,
# but we handle it defensively
msg = "No response received from flight server during request"
logger.error(msg)
raise RuntimeError(msg)
# cache experiment for future use
cache_resource(
cache_dir=self._sdk_config.cache_dir,
resource="experiment",
resource_id=experiment_id,
resource_updated_at=experiment_updated_at,
resource_data=experiment_df,
)
experiment_runs = [
obj
for run in experiment_df.to_dict(orient="records")
if (
obj := models.ExperimentRun.from_dict(
cast("dict[str, Any]", run)
)
)
is not None
]
return models.ExperimentsRunsList200Response(
experiment_runs=experiment_runs,
pagination=models.PaginationMetadata(
has_more=False, # Note that all=True
),
)
[docs]
@prerelease_endpoint(
key="experiments.annotate_runs", stage=ReleaseStage.ALPHA
)
def annotate_runs(
self,
*,
experiment: str,
dataset: str | None = None,
space: str | None = None,
annotations: builtins.list[models.AnnotateRecordInput],
) -> AnnotationBatchResult:
"""Write human annotations to a batch of runs in an experiment.
Annotations are upserted by annotation config name for each run.
Submitting the same annotation config name for the same run
overwrites the previous value. Retrying on network failure will
not create duplicates.
Up to 500 runs may be annotated per request.
Args:
experiment: Experiment ID or name.
dataset: Optional dataset ID or name used to resolve ``experiment``
by name.
space: Optional space ID or name used to resolve ``dataset`` by name.
annotations: A list of :class:`AnnotateRecordInput` items. Each item
must include a ``record_id`` (the experiment run ID) and ``values``
(a list of :class:`AnnotationInput` items with ``name``, and
optionally ``score``, ``label``, or ``text``).
Returns:
An :class:`AnnotationBatchResult` containing per-record results.
Raises:
ApiException: If the REST API returns an error response
(e.g. 400/401/404/429).
"""
experiment_id = _find_experiment_id(
api=self._api,
datasets_api=self._datasets_api,
experiment=experiment,
dataset=dataset,
space=space,
)
from arize._generated import api_client as gen
body = gen.AnnotateExperimentRunsRequestBody(annotations=annotations)
return self._api.experiments_runs_annotate(
experiment_id=experiment_id,
annotate_experiment_runs_request_body=body,
)
[docs]
def run(
self,
*,
name: str,
dataset: str,
space: str | None = None,
task: ExperimentTask,
evaluators: Evaluators | None = None,
dry_run: bool = False,
dry_run_count: int = 10,
concurrency: int = 3,
set_global_tracer_provider: bool = False,
exit_on_error: bool = False,
timeout: int = 120,
force_http: bool = False,
) -> tuple[Experiment | None, pd.DataFrame]:
"""Run an experiment on a dataset and optionally upload results.
This method executes a task against dataset examples, optionally evaluates
outputs, and (when `dry_run=False`) uploads results to Arize.
High-level flow:
1) Resolve the dataset and `space_id`.
2) Download dataset examples (or load from cache if enabled).
3) Run the task and evaluators with configurable concurrency.
4) If not a dry run, upload experiment runs and return the created
experiment plus the results dataframe.
Notes:
- If `dry_run=True`, no data is uploaded and the returned experiment is
`None`.
- When `enable_caching=True`, dataset examples may be cached and reused.
- When `force_http=True`, all gRPC/Flight calls are bypassed and pure
REST is used instead. Given that the Flight protocol allows to handle
more rows, the number of rows to run an experiment on is limited to
500 when using `force_http=True`.
Args:
name: Experiment name.
dataset: Dataset name or ID to run the experiment against.
space: Optional space name or ID used to resolve ``dataset`` by name.
task: The task to execute for each dataset example.
evaluators: Optional evaluators used to score outputs.
dry_run: If True, do not upload results to Arize.
dry_run_count: Number of dataset rows to use when `dry_run=True`.
concurrency: Number of concurrent tasks to run.
set_global_tracer_provider: If True, sets the global OpenTelemetry tracer
provider for the experiment run.
exit_on_error: If True, stop on the first error encountered during
execution.
timeout: The timeout in seconds for each task execution. Defaults to 120.
force_http: If True, bypass gRPC/Flight and use REST only. This is not
recommended for large datasets since it limits the number of rows to 500
and may be slower than the Flight path.
Returns:
If `dry_run=True`, returns `(None, results_df)`.
If `dry_run=False`, returns `(experiment, results_df)`.
Raises:
RuntimeError: If Flight operations (init/download/upload) fail or return
no response.
pa.ArrowInvalid: If converting results to Arrow fails.
Exception: For unexpected errors during Arrow conversion.
"""
dataset_id = _find_dataset_id(
api=self._datasets_api,
dataset=dataset,
space=space,
)
# TODO(Kiko): Space ID should not be needed,
# should work on server tech debt to remove this
dataset_obj = self._datasets_api.datasets_get(dataset_id=dataset_id)
space_id = dataset_obj.space_id
dataset_updated_at = getattr(dataset_obj, "updated_at", None)
# --- Phase 1: init experiment ---
if dry_run:
experiment_id = "experiment_id_for_dry_run"
trace_project_name = "traces_for_dry_run"
elif force_http:
# We need an experiment ID to enable tracing
# TODO(Kiko): This is not needed for now since experiment tracing
# is not enabled yet for the REST path
# experiment = self._init_experiment_via_http(
# dataset_id,
# name,
# )
# experiment_id = experiment.id
# trace_project_name_prefix = "experiment_traces_for_dataset_"
# trace_project_name = f"{trace_project_name_prefix}{dataset_id}"
experiment_id = "experiment_id_for_http_run"
trace_project_name = "traces_for_http_run"
else:
experiment_id, trace_project_name = (
self._init_experiment_via_flight(
space_id=space_id,
dataset_id=dataset_id,
experiment_name=name,
)
)
# --- Phase 2: obtain dataset (download or from cache) ---
# Don't cache force_http fetches — row count is limited to 500
use_cache = self._sdk_config.enable_caching and not force_http
dataset_df = (
load_cached_resource(
cache_dir=self._sdk_config.cache_dir,
resource="dataset",
resource_id=dataset_id,
resource_updated_at=dataset_updated_at,
)
if use_cache
else None
)
if dataset_df is None or dataset_df.empty:
if force_http:
dataset_df = self._get_dataset_examples_via_http(
dataset_id=dataset_id,
)
else:
dataset_df = self._get_dataset_examples_via_flight(
space_id=space_id,
dataset_id=dataset_id,
)
if dataset_df is None or dataset_df.empty:
raise ValueError(f"Dataset {dataset_id} is empty")
if use_cache:
cache_resource(
cache_dir=self._sdk_config.cache_dir,
resource="dataset",
resource_id=dataset_id,
resource_updated_at=dataset_updated_at,
resource_data=dataset_df,
)
if dry_run:
# only dry_run experiment on a subset (first N rows) of the dataset
dataset_df = dataset_df.head(dry_run_count)
# --- Phase 3: run experiment locally ---
tracer, resource = _get_tracer_resource(
project_name=trace_project_name,
space_id=space_id,
api_key=self._sdk_config.api_key,
endpoint=self._sdk_config.otlp_url,
dry_run=dry_run,
set_global_tracer_provider=set_global_tracer_provider,
)
output_df = run_experiment(
experiment_name=name,
experiment_id=experiment_id,
dataset=dataset_df,
task=task,
tracer=tracer,
resource=resource,
evaluators=evaluators,
concurrency=concurrency,
exit_on_error=exit_on_error,
timeout=timeout,
)
output_df = convert_default_columns_to_json_str(output_df)
output_df = convert_boolean_columns_to_str(output_df)
if dry_run:
return None, output_df
# --- Phase 4: upload results ---
if force_http:
return self._post_experiment_runs_via_http(
name=name,
dataset_id=dataset_id,
experiment_df=output_df,
), output_df
return self._post_experiment_runs_via_flight(
name=name,
dataset_id=dataset_id,
space_id=space_id,
experiment_df=output_df,
), output_df
# def _init_experiment_via_http(
# self,
# dataset_id: str,
# experiment_name: str,
# ) -> Experiment:
# from arize._generated import api_client as gen
#
# body = gen.ExperimentsCreateRequest(
# name=experiment_name,
# dataset_id=dataset_id,
# experiment_runs=[],
# )
# try:
# response = self._api.experiments_create(
# experiments_create_request=body
# )
# except Exception as e:
# msg = f"Error during REST request: {e!s}"
# logger.exception(msg)
# raise RuntimeError(msg) from e
# return response
def _init_experiment_via_flight(
self,
space_id: str,
dataset_id: str,
experiment_name: str,
) -> tuple[str, str]:
with ArizeFlightClient(
api_key=self._sdk_config.api_key,
host=self._sdk_config.flight_host,
port=self._sdk_config.flight_port,
scheme=self._sdk_config.flight_scheme,
request_verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
) as flight_client:
response = None
try:
response = flight_client.init_experiment(
space_id=space_id,
dataset_id=dataset_id,
experiment_name=experiment_name,
)
except Exception as e:
msg = f"Error during request: {e!s}"
logger.exception(msg)
raise RuntimeError(msg) from e
if response is None:
# This should not happen with proper Flight client implementation,
# but we handle it defensively
msg = "No response received from flight server during request"
logger.error(msg)
raise RuntimeError(msg)
experiment_id, trace_project_name = response
return experiment_id, trace_project_name
def _get_dataset_examples_via_http(
self,
dataset_id: str,
) -> pd.DataFrame:
try:
response = self._datasets_api.datasets_examples_list(
dataset_id=dataset_id,
limit=500,
)
except Exception as e:
msg = f"Error fetching dataset examples via REST: {e!s}"
logger.exception(msg)
raise RuntimeError(msg) from e
return response.to_df() # type: ignore[attr-defined]
def _get_dataset_examples_via_flight(
self,
space_id: str,
dataset_id: str,
) -> pd.DataFrame:
with ArizeFlightClient(
api_key=self._sdk_config.api_key,
host=self._sdk_config.flight_host,
port=self._sdk_config.flight_port,
scheme=self._sdk_config.flight_scheme,
request_verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
) as flight_client:
try:
dataset_df = flight_client.get_dataset_examples(
space_id=space_id,
dataset_id=dataset_id,
)
except Exception as e:
msg = f"Error during request: {e!s}"
logger.exception(msg)
raise RuntimeError(msg) from e
if dataset_df is None:
# This should not happen with proper Flight client implementation,
# but we handle it defensively
msg = "No response received from flight server during request"
logger.error(msg)
raise RuntimeError(msg)
return dataset_df
def _post_experiment_runs_via_http(
self,
name: str,
dataset_id: str,
experiment_df: pd.DataFrame,
) -> Experiment:
# TODO(Kiko): Once we enable experiment tracing for the REST path,
# we will need to replace the create call for an append runs call
from arize._generated import api_client as gen
data = experiment_df.to_dict(orient="records")
runs_create = [
obj
for run in data
if (
obj := gen.ExperimentRunCreate.from_dict(
cast("dict[str, Any]", run)
)
)
is not None
]
body = gen.ExperimentsCreateRequest(
name=name,
dataset_id=dataset_id,
experiment_runs=runs_create,
)
return self._api.experiments_create(experiments_create_request=body)
def _post_experiment_runs_via_flight(
self,
name: str,
dataset_id: str,
space_id: str,
experiment_df: pd.DataFrame,
) -> Experiment:
"""Internal method to create an experiment using Flight protocol for large datasets."""
# Convert to Arrow table
try:
logger.debug("Converting data to Arrow format")
pa_table = pa.Table.from_pandas(experiment_df, preserve_index=False)
except pa.ArrowInvalid as e:
logger.exception(INVALID_ARROW_CONVERSION_MSG)
raise pa.ArrowInvalid(
f"Error converting to Arrow format: {e!s}"
) from e
except Exception:
logger.exception("Unexpected error creating Arrow table")
raise
request_type = FlightRequestType.LOG_EXPERIMENT_DATA
with ArizeFlightClient(
api_key=self._sdk_config.api_key,
host=self._sdk_config.flight_host,
port=self._sdk_config.flight_port,
scheme=self._sdk_config.flight_scheme,
request_verify=self._sdk_config.request_verify,
max_chunksize=self._sdk_config.pyarrow_max_chunksize,
) as flight_client:
post_resp = None
try:
post_resp = flight_client.log_arrow_table(
space_id=space_id,
pa_table=pa_table,
dataset_id=dataset_id,
experiment_name=name,
request_type=request_type,
)
except Exception as e:
msg = f"Error during update request: {e!s}"
logger.exception(msg)
raise RuntimeError(msg) from e
if post_resp is None:
# This should not happen with proper Flight client implementation,
# but we handle it defensively
msg = "No response received from flight server during request"
logger.error(msg)
raise RuntimeError(msg)
return self.get(experiment=str(post_resp.experiment_id))
def _get_tracer_resource(
project_name: str,
space_id: str,
api_key: str,
endpoint: str,
dry_run: bool = False,
set_global_tracer_provider: bool = False,
) -> tuple[Tracer, Resource]:
"""Initialize and return an OpenTelemetry tracer and resource for experiment tracing."""
resource = Resource(
{
ResourceAttributes.PROJECT_NAME: project_name,
}
)
tracer_provider = trace_sdk.TracerProvider(resource=resource)
headers = {
"authorization": api_key,
"arize-space-id": space_id,
"arize-interface": "otel",
}
use_tls = any(endpoint.startswith(v) for v in ["https://", "grpc+tls://"])
insecure = not use_tls
exporter = (
ConsoleSpanExporter()
if dry_run
else GrpcSpanExporter(
endpoint=endpoint, insecure=insecure, headers=headers
)
)
tracer_provider.add_span_processor(SimpleSpanProcessor(exporter))
if set_global_tracer_provider:
trace.set_tracer_provider(tracer_provider)
return tracer_provider.get_tracer(__name__), resource