databricks-python-sdk by databricks-solutions/ai-dev-kit
npx skills add https://github.com/databricks-solutions/ai-dev-kit --skill databricks-python-sdk此技能提供 Databricks SDK、Databricks Connect、CLI 和 REST API 的指导。
SDK 文档: https://databricks-sdk-py.readthedocs.io/en/latest/ GitHub 仓库: https://github.com/databricks/databricks-sdk-py
.venv 处的现有虚拟环境或使用 uv 创建一个uv pip install databricks-connectuv pip install databricks-sdkDEFAULT~/.databrickscfg广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
DATABRICKS_HOST、DATABRICKS_TOKEN使用 databricks-connect 在本地针对 Databricks 集群运行 Spark 代码。
from databricks.connect import DatabricksSession
# 自动从 ~/.databrickscfg 检测 'DEFAULT' 配置文件
spark = DatabricksSession.builder.getOrCreate()
# 使用显式配置文件
spark = DatabricksSession.builder.profile("MY_PROFILE").getOrCreate()
# 正常使用 spark
df = spark.sql("SELECT * FROM catalog.schema.table")
df.show()
重要: 不要设置 .master("local[*]") - 这会导致 Databricks Connect 出现问题。
对于 SDK 中尚未包含或通过 SDK 过于复杂的操作,使用直接 REST API:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# 使用认证客户端进行直接 API 调用
response = w.api_client.do(
method="GET",
path="/api/2.0/clusters/list"
)
# 带请求体的 POST
response = w.api_client.do(
method="POST",
path="/api/2.0/jobs/run-now",
body={"job_id": 123}
)
使用时机: 优先使用 SDK 方法(如果可用)。在以下情况下使用 api_client.do:
# 检查版本(应 >= 0.278.0)
databricks --version
# 使用特定配置文件
databricks --profile MY_PROFILE clusters list
# 常用命令
databricks clusters list
databricks jobs list
databricks workspace ls /Users/me
SDK 文档遵循可预测的 URL 模式:
Base: https://databricks-sdk-py.readthedocs.io/en/latest/
Workspace APIs: /workspace/{category}/{service}.html
Account APIs: /account/{category}/{service}.html
Authentication: /authentication.html
DBUtils: /dbutils.html
| 类别 | 服务 |
|---|---|
compute | clusters, cluster_policies, command_execution, instance_pools, libraries |
catalog | catalogs, schemas, tables, volumes, functions, storage_credentials, external_locations |
jobs | jobs |
sql | warehouses, statement_execution, queries, alerts, dashboards |
serving | serving_endpoints |
vectorsearch | vector_search_indexes, vector_search_endpoints |
pipelines | pipelines |
workspace | repos, secrets, workspace, git_credentials |
files | files, dbfs |
ml | experiments, model_registry |
DATABRICKS_HOST=https://your-workspace.cloud.databricks.com
DATABRICKS_TOKEN=dapi... # 个人访问令牌
# 从环境自动检测凭据
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# 显式令牌认证
w = WorkspaceClient(
host="https://your-workspace.cloud.databricks.com",
token="dapi..."
)
# Azure 服务主体
w = WorkspaceClient(
host="https://adb-xxx.azuredatabricks.net",
azure_workspace_resource_id="/subscriptions/.../resourceGroups/.../providers/Microsoft.Databricks/workspaces/...",
azure_tenant_id="tenant-id",
azure_client_id="client-id",
azure_client_secret="secret"
)
# 使用 ~/.databrickscfg 中的命名配置文件
w = WorkspaceClient(profile="MY_PROFILE")
# 列出所有集群
for cluster in w.clusters.list():
print(f"{cluster.cluster_name}: {cluster.state}")
# 获取集群详情
cluster = w.clusters.get(cluster_id="0123-456789-abcdef")
# 创建集群(返回 Wait 对象)
wait = w.clusters.create(
cluster_name="my-cluster",
spark_version=w.clusters.select_spark_version(latest=True),
node_type_id=w.clusters.select_node_type(local_disk=True),
num_workers=2
)
cluster = wait.result() # 等待集群运行
# 或者使用 create_and_wait 进行阻塞调用
cluster = w.clusters.create_and_wait(
cluster_name="my-cluster",
spark_version="14.3.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=2,
timeout=timedelta(minutes=30)
)
# 启动/停止/删除
w.clusters.start(cluster_id="...").result()
w.clusters.stop(cluster_id="...")
w.clusters.delete(cluster_id="...")
from databricks.sdk.service.jobs import Task, NotebookTask
# 列出作业
for job in w.jobs.list():
print(f"{job.job_id}: {job.settings.name}")
# 创建作业
created = w.jobs.create(
name="my-job",
tasks=[
Task(
task_key="main",
notebook_task=NotebookTask(notebook_path="/Users/me/notebook"),
existing_cluster_id="0123-456789-abcdef"
)
]
)
# 立即运行作业
run = w.jobs.run_now_and_wait(job_id=created.job_id)
print(f"运行完成: {run.state.result_state}")
# 获取运行输出
output = w.jobs.get_run_output(run_id=run.run_id)
# 执行 SQL 查询
response = w.statement_execution.execute_statement(
warehouse_id="abc123",
statement="SELECT * FROM catalog.schema.table LIMIT 10",
wait_timeout="30s"
)
# 检查状态并获取结果
if response.status.state == StatementState.SUCCEEDED:
for row in response.result.data_array:
print(row)
# 对于大型结果,分块获取
chunk = w.statement_execution.get_statement_result_chunk_n(
statement_id=response.statement_id,
chunk_index=0
)
# 列出仓库
for wh in w.warehouses.list():
print(f"{wh.name}: {wh.state}")
# 获取仓库
warehouse = w.warehouses.get(id="abc123")
# 创建仓库
created = w.warehouses.create_and_wait(
name="my-warehouse",
cluster_size="Small",
max_num_clusters=1,
auto_stop_mins=15
)
# 启动/停止
w.warehouses.start(id="abc123").result()
w.warehouses.stop(id="abc123").result()
# 列出模式中的表
for table in w.tables.list(catalog_name="main", schema_name="default"):
print(f"{table.full_name}: {table.table_type}")
# 获取表信息
table = w.tables.get(full_name="main.default.my_table")
print(f"列: {[c.name for c in table.columns]}")
# 检查表是否存在
exists = w.tables.exists(full_name="main.default.my_table")
文档 (Catalogs): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/catalogs.html 文档 (Schemas): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/schemas.html
# 列出目录
for catalog in w.catalogs.list():
print(catalog.name)
# 创建目录
w.catalogs.create(name="my_catalog", comment="Description")
# 列出模式
for schema in w.schemas.list(catalog_name="main"):
print(schema.name)
# 创建模式
w.schemas.create(name="my_schema", catalog_name="main")
from databricks.sdk.service.catalog import VolumeType
# 列出卷
for vol in w.volumes.list(catalog_name="main", schema_name="default"):
print(f"{vol.full_name}: {vol.volume_type}")
# 创建托管卷
w.volumes.create(
catalog_name="main",
schema_name="default",
name="my_volume",
volume_type=VolumeType.MANAGED
)
# 读取卷信息
vol = w.volumes.read(name="main.default.my_volume")
# 上传文件到卷
w.files.upload(
file_path="/Volumes/main/default/my_volume/data.csv",
contents=open("local_file.csv", "rb")
)
# 下载文件
with w.files.download(file_path="/Volumes/main/default/my_volume/data.csv") as f:
content = f.read()
# 列出目录内容
for entry in w.files.list_directory_contents("/Volumes/main/default/my_volume/"):
print(f"{entry.name}: {entry.is_directory}")
# 带进度的上传/下载(并行)
w.files.upload_from(
file_path="/Volumes/main/default/my_volume/large.parquet",
source_path="/local/path/large.parquet",
use_parallel=True
)
w.files.download_to(
file_path="/Volumes/main/default/my_volume/large.parquet",
destination="/local/output/",
use_parallel=True
)
# 列出端点
for ep in w.serving_endpoints.list():
print(f"{ep.name}: {ep.state}")
# 获取端点
endpoint = w.serving_endpoints.get(name="my-endpoint")
# 查询端点
response = w.serving_endpoints.query(
name="my-endpoint",
inputs={"prompt": "Hello, world!"}
)
# 对于聊天/补全端点
response = w.serving_endpoints.query(
name="my-chat-endpoint",
messages=[{"role": "user", "content": "Hello!"}]
)
# 获取 OpenAI 兼容客户端
openai_client = w.serving_endpoints.get_open_ai_client()
文档 (Indexes): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_indexes.html 文档 (Endpoints): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_endpoints.html
# 列出向量搜索索引
for idx in w.vector_search_indexes.list_indexes(endpoint_name="my-vs-endpoint"):
print(idx.name)
# 查询索引
results = w.vector_search_indexes.query_index(
index_name="main.default.my_index",
columns=["id", "text", "embedding"],
query_text="search query",
num_results=10
)
for doc in results.result.data_array:
print(doc)
# 列出管道
for pipeline in w.pipelines.list_pipelines():
print(f"{pipeline.name}: {pipeline.state}")
# 获取管道
pipeline = w.pipelines.get(pipeline_id="abc123")
# 启动管道更新
w.pipelines.start_update(pipeline_id="abc123")
# 停止管道
w.pipelines.stop_and_wait(pipeline_id="abc123")
# 列出密钥作用域
for scope in w.secrets.list_scopes():
print(scope.name)
# 创建作用域
w.secrets.create_scope(scope="my-scope")
# 放置密钥
w.secrets.put_secret(scope="my-scope", key="api-key", string_value="secret123")
# 获取密钥(返回包含值的 GetSecretResponse)
secret = w.secrets.get_secret(scope="my-scope", key="api-key")
# 列出作用域中的密钥(仅元数据,非值)
for s in w.secrets.list_secrets(scope="my-scope"):
print(s.key)
# 通过 WorkspaceClient 访问 dbutils
dbutils = w.dbutils
# 文件系统操作
files = dbutils.fs.ls("/")
dbutils.fs.cp("dbfs:/source", "dbfs:/dest")
dbutils.fs.rm("dbfs:/path", recurse=True)
# 密钥(与 w.secrets 相同,但使用 dbutils 接口)
value = dbutils.secrets.get(scope="my-scope", key="my-key")
Databricks SDK 是完全同步的。 所有调用都会阻塞线程。在异步应用程序(FastAPI、asyncio)中,必须使用 asyncio.to_thread() 包装 SDK 调用,以避免阻塞事件循环。
import asyncio
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# 错误 - 阻塞事件循环
async def get_clusters_bad():
return list(w.clusters.list()) # 阻塞!
# 正确 - 在线程池中运行
async def get_clusters_good():
return await asyncio.to_thread(lambda: list(w.clusters.list()))
# 正确 - 对于简单调用
async def get_cluster(cluster_id: str):
return await asyncio.to_thread(w.clusters.get, cluster_id)
# 正确 - FastAPI 端点
from fastapi import FastAPI
app = FastAPI()
@app.get("/clusters")
async def list_clusters():
clusters = await asyncio.to_thread(lambda: list(w.clusters.list()))
return [{"id": c.cluster_id, "name": c.cluster_name} for c in clusters]
@app.post("/query")
async def run_query(sql: str, warehouse_id: str):
# 包装阻塞的 SDK 调用
response = await asyncio.to_thread(
w.statement_execution.execute_statement,
statement=sql,
warehouse_id=warehouse_id,
wait_timeout="30s"
)
return response.result.data_array
注意: WorkspaceClient().config.host 不是网络调用 - 它只是读取配置。无需包装属性访问。
from datetime import timedelta
# 模式 1:使用 *_and_wait 方法
cluster = w.clusters.create_and_wait(
cluster_name="test",
spark_version="14.3.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=2,
timeout=timedelta(minutes=30)
)
# 模式 2:使用 Wait 对象
wait = w.clusters.create(...)
cluster = wait.result() # 阻塞直到就绪
# 模式 3:带回调的手动轮询
def progress(cluster):
print(f"状态: {cluster.state}")
cluster = w.clusters.wait_get_cluster_running(
cluster_id="...",
timeout=timedelta(minutes=30),
callback=progress
)
# 所有 list 方法都返回自动处理分页的迭代器
for job in w.jobs.list(): # 获取所有页面
print(job.settings.name)
# 对于手动控制
from databricks.sdk.service.jobs import ListJobsRequest
response = w.jobs.list(limit=10)
for job in response:
print(job)
from databricks.sdk.errors import NotFound, PermissionDenied, ResourceAlreadyExists
try:
cluster = w.clusters.get(cluster_id="invalid-id")
except NotFound:
print("未找到集群")
except PermissionDenied:
print("访问被拒绝")
如果我不确定某个方法,我应该:
检查文档 URL 模式:
https://databricks-sdk-py.readthedocs.io/en/latest/workspace/{category}/{service}.html常见类别:
/workspace/compute/clusters.html/workspace/jobs/jobs.html/workspace/catalog/tables.html/workspace/sql/warehouses.html/workspace/serving/serving_endpoints.html在提供参数或返回类型的指导之前,先获取并验证。
每周安装次数
63
仓库
GitHub 星标数
820
首次出现
2026年2月9日
安全审计
安装于
opencode59
codex58
gemini-cli55
github-copilot55
amp52
kimi-cli52
This skill provides guidance for Databricks SDK, Databricks Connect, CLI, and REST API.
SDK Documentation: https://databricks-sdk-py.readthedocs.io/en/latest/ GitHub Repository: https://github.com/databricks/databricks-sdk-py
.venv or use uv to create oneuv pip install databricks-connectuv pip install databricks-sdkDEFAULT~/.databrickscfgDATABRICKS_HOST, DATABRICKS_TOKENUse databricks-connect for running Spark code locally against a Databricks cluster.
from databricks.connect import DatabricksSession
# Auto-detects 'DEFAULT' profile from ~/.databrickscfg
spark = DatabricksSession.builder.getOrCreate()
# With explicit profile
spark = DatabricksSession.builder.profile("MY_PROFILE").getOrCreate()
# Use spark as normal
df = spark.sql("SELECT * FROM catalog.schema.table")
df.show()
IMPORTANT: Do NOT set .master("local[*]") - this will cause issues with Databricks Connect.
For operations not yet in SDK or overly complex via SDK, use direct REST API:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Direct API call using authenticated client
response = w.api_client.do(
method="GET",
path="/api/2.0/clusters/list"
)
# POST with body
response = w.api_client.do(
method="POST",
path="/api/2.0/jobs/run-now",
body={"job_id": 123}
)
When to use: Prefer SDK methods when available. Use api_client.do for:
# Check version (should be >= 0.278.0)
databricks --version
# Use specific profile
databricks --profile MY_PROFILE clusters list
# Common commands
databricks clusters list
databricks jobs list
databricks workspace ls /Users/me
The SDK documentation follows a predictable URL pattern:
Base: https://databricks-sdk-py.readthedocs.io/en/latest/
Workspace APIs: /workspace/{category}/{service}.html
Account APIs: /account/{category}/{service}.html
Authentication: /authentication.html
DBUtils: /dbutils.html
| Category | Services |
|---|---|
compute | clusters, cluster_policies, command_execution, instance_pools, libraries |
catalog | catalogs, schemas, tables, volumes, functions, storage_credentials, external_locations |
jobs | jobs |
sql | warehouses, statement_execution, queries, alerts, dashboards |
serving | serving_endpoints |
vectorsearch | vector_search_indexes, vector_search_endpoints |
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/authentication.html
DATABRICKS_HOST=https://your-workspace.cloud.databricks.com
DATABRICKS_TOKEN=dapi... # Personal Access Token
# Auto-detect credentials from environment
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Explicit token auth
w = WorkspaceClient(
host="https://your-workspace.cloud.databricks.com",
token="dapi..."
)
# Azure Service Principal
w = WorkspaceClient(
host="https://adb-xxx.azuredatabricks.net",
azure_workspace_resource_id="/subscriptions/.../resourceGroups/.../providers/Microsoft.Databricks/workspaces/...",
azure_tenant_id="tenant-id",
azure_client_id="client-id",
azure_client_secret="secret"
)
# Use a named profile from ~/.databrickscfg
w = WorkspaceClient(profile="MY_PROFILE")
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/compute/clusters.html
# List all clusters
for cluster in w.clusters.list():
print(f"{cluster.cluster_name}: {cluster.state}")
# Get cluster details
cluster = w.clusters.get(cluster_id="0123-456789-abcdef")
# Create a cluster (returns Wait object)
wait = w.clusters.create(
cluster_name="my-cluster",
spark_version=w.clusters.select_spark_version(latest=True),
node_type_id=w.clusters.select_node_type(local_disk=True),
num_workers=2
)
cluster = wait.result() # Wait for cluster to be running
# Or use create_and_wait for blocking call
cluster = w.clusters.create_and_wait(
cluster_name="my-cluster",
spark_version="14.3.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=2,
timeout=timedelta(minutes=30)
)
# Start/stop/delete
w.clusters.start(cluster_id="...").result()
w.clusters.stop(cluster_id="...")
w.clusters.delete(cluster_id="...")
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/jobs/jobs.html
from databricks.sdk.service.jobs import Task, NotebookTask
# List jobs
for job in w.jobs.list():
print(f"{job.job_id}: {job.settings.name}")
# Create a job
created = w.jobs.create(
name="my-job",
tasks=[
Task(
task_key="main",
notebook_task=NotebookTask(notebook_path="/Users/me/notebook"),
existing_cluster_id="0123-456789-abcdef"
)
]
)
# Run a job now
run = w.jobs.run_now_and_wait(job_id=created.job_id)
print(f"Run completed: {run.state.result_state}")
# Get run output
output = w.jobs.get_run_output(run_id=run.run_id)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/statement_execution.html
# Execute SQL query
response = w.statement_execution.execute_statement(
warehouse_id="abc123",
statement="SELECT * FROM catalog.schema.table LIMIT 10",
wait_timeout="30s"
)
# Check status and get results
if response.status.state == StatementState.SUCCEEDED:
for row in response.result.data_array:
print(row)
# For large results, fetch chunks
chunk = w.statement_execution.get_statement_result_chunk_n(
statement_id=response.statement_id,
chunk_index=0
)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/sql/warehouses.html
# List warehouses
for wh in w.warehouses.list():
print(f"{wh.name}: {wh.state}")
# Get warehouse
warehouse = w.warehouses.get(id="abc123")
# Create warehouse
created = w.warehouses.create_and_wait(
name="my-warehouse",
cluster_size="Small",
max_num_clusters=1,
auto_stop_mins=15
)
# Start/stop
w.warehouses.start(id="abc123").result()
w.warehouses.stop(id="abc123").result()
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/tables.html
# List tables in a schema
for table in w.tables.list(catalog_name="main", schema_name="default"):
print(f"{table.full_name}: {table.table_type}")
# Get table info
table = w.tables.get(full_name="main.default.my_table")
print(f"Columns: {[c.name for c in table.columns]}")
# Check if table exists
exists = w.tables.exists(full_name="main.default.my_table")
Doc (Catalogs): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/catalogs.html Doc (Schemas): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/schemas.html
# List catalogs
for catalog in w.catalogs.list():
print(catalog.name)
# Create catalog
w.catalogs.create(name="my_catalog", comment="Description")
# List schemas
for schema in w.schemas.list(catalog_name="main"):
print(schema.name)
# Create schema
w.schemas.create(name="my_schema", catalog_name="main")
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/volumes.html
from databricks.sdk.service.catalog import VolumeType
# List volumes
for vol in w.volumes.list(catalog_name="main", schema_name="default"):
print(f"{vol.full_name}: {vol.volume_type}")
# Create managed volume
w.volumes.create(
catalog_name="main",
schema_name="default",
name="my_volume",
volume_type=VolumeType.MANAGED
)
# Read volume info
vol = w.volumes.read(name="main.default.my_volume")
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/files/files.html
# Upload file to volume
w.files.upload(
file_path="/Volumes/main/default/my_volume/data.csv",
contents=open("local_file.csv", "rb")
)
# Download file
with w.files.download(file_path="/Volumes/main/default/my_volume/data.csv") as f:
content = f.read()
# List directory contents
for entry in w.files.list_directory_contents("/Volumes/main/default/my_volume/"):
print(f"{entry.name}: {entry.is_directory}")
# Upload/download with progress (parallel)
w.files.upload_from(
file_path="/Volumes/main/default/my_volume/large.parquet",
source_path="/local/path/large.parquet",
use_parallel=True
)
w.files.download_to(
file_path="/Volumes/main/default/my_volume/large.parquet",
destination="/local/output/",
use_parallel=True
)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/serving/serving_endpoints.html
# List endpoints
for ep in w.serving_endpoints.list():
print(f"{ep.name}: {ep.state}")
# Get endpoint
endpoint = w.serving_endpoints.get(name="my-endpoint")
# Query endpoint
response = w.serving_endpoints.query(
name="my-endpoint",
inputs={"prompt": "Hello, world!"}
)
# For chat/completions endpoints
response = w.serving_endpoints.query(
name="my-chat-endpoint",
messages=[{"role": "user", "content": "Hello!"}]
)
# Get OpenAI-compatible client
openai_client = w.serving_endpoints.get_open_ai_client()
Doc (Indexes): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_indexes.html Doc (Endpoints): https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_endpoints.html
# List vector search indexes
for idx in w.vector_search_indexes.list_indexes(endpoint_name="my-vs-endpoint"):
print(idx.name)
# Query index
results = w.vector_search_indexes.query_index(
index_name="main.default.my_index",
columns=["id", "text", "embedding"],
query_text="search query",
num_results=10
)
for doc in results.result.data_array:
print(doc)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/pipelines/pipelines.html
# List pipelines
for pipeline in w.pipelines.list_pipelines():
print(f"{pipeline.name}: {pipeline.state}")
# Get pipeline
pipeline = w.pipelines.get(pipeline_id="abc123")
# Start pipeline update
w.pipelines.start_update(pipeline_id="abc123")
# Stop pipeline
w.pipelines.stop_and_wait(pipeline_id="abc123")
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/workspace/workspace/secrets.html
# List secret scopes
for scope in w.secrets.list_scopes():
print(scope.name)
# Create scope
w.secrets.create_scope(scope="my-scope")
# Put secret
w.secrets.put_secret(scope="my-scope", key="api-key", string_value="secret123")
# Get secret (returns GetSecretResponse with value)
secret = w.secrets.get_secret(scope="my-scope", key="api-key")
# List secrets in scope (metadata only, not values)
for s in w.secrets.list_secrets(scope="my-scope"):
print(s.key)
Doc: https://databricks-sdk-py.readthedocs.io/en/latest/dbutils.html
# Access dbutils through WorkspaceClient
dbutils = w.dbutils
# File system operations
files = dbutils.fs.ls("/")
dbutils.fs.cp("dbfs:/source", "dbfs:/dest")
dbutils.fs.rm("dbfs:/path", recurse=True)
# Secrets (same as w.secrets but dbutils interface)
value = dbutils.secrets.get(scope="my-scope", key="my-key")
The Databricks SDK is fully synchronous. All calls block the thread. In async applications (FastAPI, asyncio), you MUST wrap SDK calls with asyncio.to_thread() to avoid blocking the event loop.
import asyncio
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# WRONG - blocks the event loop
async def get_clusters_bad():
return list(w.clusters.list()) # BLOCKS!
# CORRECT - runs in thread pool
async def get_clusters_good():
return await asyncio.to_thread(lambda: list(w.clusters.list()))
# CORRECT - for simple calls
async def get_cluster(cluster_id: str):
return await asyncio.to_thread(w.clusters.get, cluster_id)
# CORRECT - FastAPI endpoint
from fastapi import FastAPI
app = FastAPI()
@app.get("/clusters")
async def list_clusters():
clusters = await asyncio.to_thread(lambda: list(w.clusters.list()))
return [{"id": c.cluster_id, "name": c.cluster_name} for c in clusters]
@app.post("/query")
async def run_query(sql: str, warehouse_id: str):
# Wrap the blocking SDK call
response = await asyncio.to_thread(
w.statement_execution.execute_statement,
statement=sql,
warehouse_id=warehouse_id,
wait_timeout="30s"
)
return response.result.data_array
Note: WorkspaceClient().config.host is NOT a network call - it just reads config. No need to wrap property access.
from datetime import timedelta
# Pattern 1: Use *_and_wait methods
cluster = w.clusters.create_and_wait(
cluster_name="test",
spark_version="14.3.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=2,
timeout=timedelta(minutes=30)
)
# Pattern 2: Use Wait object
wait = w.clusters.create(...)
cluster = wait.result() # Blocks until ready
# Pattern 3: Manual polling with callback
def progress(cluster):
print(f"State: {cluster.state}")
cluster = w.clusters.wait_get_cluster_running(
cluster_id="...",
timeout=timedelta(minutes=30),
callback=progress
)
# All list methods return iterators that handle pagination automatically
for job in w.jobs.list(): # Fetches all pages
print(job.settings.name)
# For manual control
from databricks.sdk.service.jobs import ListJobsRequest
response = w.jobs.list(limit=10)
for job in response:
print(job)
from databricks.sdk.errors import NotFound, PermissionDenied, ResourceAlreadyExists
try:
cluster = w.clusters.get(cluster_id="invalid-id")
except NotFound:
print("Cluster not found")
except PermissionDenied:
print("Access denied")
If I'm unsure about a method, I should:
Check the documentation URL pattern:
https://databricks-sdk-py.readthedocs.io/en/latest/workspace/{category}/{service}.htmlCommon categories:
/workspace/compute/clusters.html/workspace/jobs/jobs.html/workspace/catalog/tables.html/workspace/sql/warehouses.html/workspace/serving/serving_endpoints.htmlFetch and verify before providing guidance on parameters or return types.
Weekly Installs
63
Repository
GitHub Stars
820
First Seen
Feb 9, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykFail
Installed on
opencode59
codex58
gemini-cli55
github-copilot55
amp52
kimi-cli52
Azure Data Explorer (Kusto) 查询技能:KQL数据分析、日志遥测与时间序列处理
133,300 周安装
pipelines | pipelines |
workspace | repos, secrets, workspace, git_credentials |
files | files, dbfs |
ml | experiments, model_registry |
| Schemas | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/schemas.html |
| Volumes | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/catalog/volumes.html |
| Files | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/files/files.html |
| Serving Endpoints | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/serving/serving_endpoints.html |
| Vector Search | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/vectorsearch/vector_search_indexes.html |
| Pipelines | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/pipelines/pipelines.html |
| Secrets | https://databricks-sdk-py.readthedocs.io/en/latest/workspace/workspace/secrets.html |
| DBUtils | https://databricks-sdk-py.readthedocs.io/en/latest/dbutils.html |