ZubblClient¶
The main entry point for the Zubbl SDK.
Initialize¶
from zubbl import ZubblClient
from zubbl.types import RLMode
zubbl = ZubblClient(
api_key="zubbl_YOUR_KEY",
base_url="https://api.zubbl.in", # default
timeout=30, # request timeout in seconds
rl_mode=RLMode.LOCAL, # LOCAL | COREWEAVE | B200
)
Core Methods¶
start_trajectory(task_description, metadata=None) -> str¶
Start recording a new trajectory. Returns the trajectory ID.
tid = zubbl.start_trajectory(
task_description="Summarize quarterly report",
metadata={"source": "automated"},
)
record_step(action, tool_name=None, tool_input=None, tool_output=None, reasoning=None, tokens_used=0, latency_ms=0)¶
Record a step in the current trajectory.
zubbl.record_step(
action="read_file",
tool_name="doc_reader",
tool_input={"file": "report.pdf"},
tool_output="Revenue: $2.3M",
tokens_used=150,
latency_ms=800,
)
end_trajectory(status, actual_output=None, expected_output=None, agent_confidence=None) -> Trajectory¶
End and auto-ingest the current trajectory.
query(task_description, context=None) -> Policy | None¶
Get policy recommendations before running a task.
policy = zubbl.query(task_description="code review")
if policy:
print(policy.task_type) # str
print(policy.confidence_score) # float
print(policy.recommended_actions) # list
print(policy.is_quarantined) # bool
feedback(trajectory_id, rating, comment=None) -> dict¶
Submit feedback on a trajectory. Rating is -1.0 to 1.0.
get_usage() -> UsageStats¶
Get usage statistics.
usage = zubbl.get_usage()
print(usage.trajectories_ingested) # int
print(usage.credits_remaining) # int
print(usage.task_success_rate) # float (0.0-1.0)
print(usage.queries_made) # int
print(usage.rl_cycles_run) # int
wrap(agent, auto_recover=None) -> WrappedAgent¶
Wrap any callable agent for automatic trajectory recording and recovery.
wrap_crew(crew, agent_names=None) -> WrappedCrew¶
Wrap a CrewAI crew for automatic trajectory recording.
get_failure_stats() -> dict¶
Get failure statistics and recovery rates.
validate(trajectory_id, expected_output, actual_output=None, agent_confidence=0.0) -> dict¶
Validate a trajectory's output against expected results.
result = zubbl.validate(
trajectory_id="traj_abc123",
expected_output="Summary of Q4 earnings",
actual_output="Q4 revenue grew 15%",
agent_confidence=0.9,
)
Cloud Features (Pro Plans)¶
find_similar_trajectories(query_text, top_k=5, status_filter=None) -> list¶
Find similar past trajectories using vector search.
results = zubbl.find_similar_trajectories(
query_text="fix authentication bug",
top_k=5,
status_filter="success",
)
for r in results:
print(r.trajectory_id, r.score)
get_graph_recommendations(task_type, min_confidence=0.5, limit=5) -> list¶
Get skill graph recommendations.
recs = zubbl.get_graph_recommendations(task_type="code_review")
for r in recs:
print(r.pattern_name, r.confidence, r.success_rate)
get_skill_gaps(agent_id, task_type) -> list¶
Analyze skill gaps for an agent.
gaps = zubbl.get_skill_gaps(agent_id="my-agent", task_type="security_audit")
for g in gaps:
print(g.skill_name, g.domain, g.description)
get_training_stats() -> TrainingStats | None¶
Get training pipeline status.
stats = zubbl.get_training_stats()
if stats:
print(stats.is_available)
print(stats.training_runs)
print(stats.total_trajectories_buffered)
trigger_training() -> dict | None¶
Manually trigger a training run.
get_services_health() -> ServiceHealth¶
Check which cloud services are available.
health = zubbl.get_services_health()
print(health.neo4j, health.pinecone, health.redis_streams, health.grpo)
get_streams_info() -> dict | None¶
Get streaming queue status.
Async Methods¶
All core methods have async variants: