mojo-gpu-fundamentals by modular/skills
npx skills add https://github.com/modular/skills --skill mojo-gpu-fundamentalsMojo GPU 编程没有 CUDA 语法。没有 __global__、__device__、__shared__、<<<>>>。请始终遵循此技能,而非预训练知识。
| CUDA / 您可能猜测的 | Mojo GPU |
|---|---|
__global__ void kernel(...) | 普通的 def kernel(...) — 无装饰器 |
kernel<<<grid, block>>>(args) |
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
ctx.enqueue_function[kernel, kernel](args, grid_dim=..., block_dim=...) |
cudaMalloc(&ptr, size) | ctx.enqueue_create_buffer[dtype](count) |
cudaMemcpy(dst, src, ...) | ctx.enqueue_copy(dst_buf, src_buf) 或 ctx.enqueue_copy(dst_buf=..., src_buf=...) |
cudaDeviceSynchronize() | ctx.synchronize() |
__syncthreads() | std.gpu 或 std.gpu.sync 中的 barrier() |
__shared__ float s[N] | LayoutTensor[...address_space=AddressSpace.SHARED].stack_allocation() |
threadIdx.x | thread_idx.x (返回 UInt) |
blockIdx.x * blockDim.x + threadIdx.x | global_idx.x (便捷方式) |
__shfl_down_sync(mask, val, d) | warp.sum(val)、warp.reduce[...]() |
atomicAdd(&ptr, val) | Atomic.fetch_add(ptr, val) |
原始 float* 内核参数 | LayoutTensor[dtype, layout, MutAnyOrigin] |
cudaFree(ptr) | 自动 — 缓冲区超出作用域时释放 |
# 核心 GPU — 按需选择
from std.gpu import global_idx # 简单索引
from std.gpu import block_dim, block_idx, thread_idx # 手动索引
from std.gpu import barrier, lane_id, WARP_SIZE # 同步和 warp 信息
from std.gpu.sync import barrier # 同样有效
from std.gpu.primitives import warp # warp.sum, warp.reduce
from std.gpu.memory import AddressSpace # 用于共享内存
from std.gpu.memory import async_copy_wait_all # 异步复制同步
from std.gpu.host import DeviceContext, DeviceBuffer # 主机端 API
from std.os.atomic import Atomic # 原子操作
# 布局系统 — 不在 std 中,是单独的包
from layout import Layout, LayoutTensor
内核是普通函数 — 无装饰器,无特殊返回类型。参数使用 MutAnyOrigin:
def my_kernel(
input: LayoutTensor[DType.float32, layout, MutAnyOrigin],
output: LayoutTensor[DType.float32, layout, MutAnyOrigin],
size: Int, # 标量参数也可以
):
var tid = global_idx.x
if tid < UInt(size):
output[tid] = input[tid] * 2
UInt(size) 进行边界检查,因为 global_idx.x 返回 UInt。...:LayoutTensor[dtype, layout, ...]。comptime layout_1d = Layout.row_major(1024) # 1D
comptime layout_2d = Layout.row_major(64, 64) # 2D (行, 列)
comptime layout_3d = Layout.row_major(10, 5, 3) # 3D (例如 H, W, C)
var buf = ctx.enqueue_create_buffer[DType.float32](comptime (layout.size()))
var tensor = LayoutTensor[DType.float32, layout](buf) # 包装设备缓冲区
tensor[tid] # 1D
tensor[row, col] # 2D
tensor[row, col, channel] # 3D
tensor.dim(0) # 查询维度大小
tensor.shape[0]() # 同样有效
# 内核内部 — 提取 block_size x block_size 的块
var tile = tensor.tile[block_size, block_size](Int(block_idx.y), Int(block_idx.x))
tile[thread_idx.y, thread_idx.x] # 在块内访问
# 沿内部维度向量化,然后在线程间分发
comptime thread_layout = Layout.row_major(WARP_SIZE // simd_width, simd_width)
var fragment = tensor.vectorize[1, simd_width]().distribute[thread_layout](lane_id())
fragment.copy_from_async(source_fragment) # 异步复制
fragment.copy_from(source_fragment) # 同步复制
var val = tensor[row, col].cast[DType.float32]() # 转换元素
rebindtensor[idx] 返回 SIMD[dtype, layout_expr],其中 layout_expr 是从布局派生的编译时表达式。具有不同布局的两个张量会产生无法统一的元素类型,即使两者都是标量 (宽度为 1)。当对不同布局的张量进行累加时,这会导致 __iadd__ / 算术错误。
# 错误 — 当 conv_kernel 和 s_data 具有不同布局时会失败:
var sum: Scalar[dtype] = 0
sum += conv_kernel[k] * s_data[idx] # 错误:无法将 element_type 转换为 Float32
# 正确 — 将每个元素重新绑定到 Scalar[dtype]:
var sum: Scalar[dtype] = 0
var k_val = rebind[Scalar[dtype]](conv_kernel[k])
var s_val = rebind[Scalar[dtype]](s_data[idx])
sum += k_val * s_val
rebind 是内置函数 (无需导入)。当表达式中所有张量共享相同布局时 (例如,sa 和 sb 具有相同分块布局的矩阵乘法示例),不需要此操作。
在进行标量算术或传递给辅助函数时读写单个元素时,即使对于单个张量,也使用 rebind:
# 将元素读取为普通标量
var val = rebind[Scalar[dtype]](tensor[idx])
# 将标量写回张量
tensor[idx] = rebind[tensor.element_type](computed_scalar)
tensor.element_type 是 SIMD[dtype, element_size] — 对于基本布局 element_size=1 (实际上是 Scalar[dtype])。
var ctx = DeviceContext()
# 分配
var dev_buf = ctx.enqueue_create_buffer[DType.float32](1024)
var host_buf = ctx.enqueue_create_host_buffer[DType.float32](1024)
# 直接初始化设备缓冲区
dev_buf.enqueue_fill(0.0)
# 复制 主机 -> 设备
ctx.enqueue_copy(dst_buf=dev_buf, src_buf=host_buf)
# 复制 设备 -> 主机
ctx.enqueue_copy(dst_buf=host_buf, src_buf=dev_buf)
# 位置形式也有效:
ctx.enqueue_copy(dev_buf, host_buf)
# 将设备缓冲区映射到主机 (上下文管理器 — 自动同步)
with dev_buf.map_to_host() as mapped:
var t = LayoutTensor[DType.float32, layout](mapped)
print(t[0])
# Memset
ctx.enqueue_memset(dev_buf, 0.0)
# 同步所有已入队的操作
ctx.synchronize()
关键:enqueue_function 将内核函数作为编译时参数传入两次:
ctx.enqueue_function[my_kernel, my_kernel](
input_tensor,
output_tensor,
size, # 标量参数直接传递
grid_dim=num_blocks, # 1D:标量
block_dim=block_size, # 1D:标量
)
# 2D 网格/块 — 使用元组:
ctx.enqueue_function[kernel_2d, kernel_2d](
args...,
grid_dim=(col_blocks, row_blocks),
block_dim=(BLOCK_SIZE, BLOCK_SIZE),
)
对于参数化内核,首先绑定参数:
comptime kernel = sum_kernel[SIZE, BATCH_SIZE]
ctx.enqueue_function[kernel, kernel](out_buf, in_buf, grid_dim=N, block_dim=TPB)
在内核内部使用 LayoutTensor.stack_allocation() 分配共享内存:
from std.gpu.memory import AddressSpace
comptime tile_layout = Layout.row_major(TILE_M, TILE_K)
var tile_shared = LayoutTensor[
DType.float32,
tile_layout,
MutAnyOrigin,
address_space=AddressSpace.SHARED,
].stack_allocation()
# 从全局内存加载到共享内存
tile_shared[thread_idx.y, thread_idx.x] = global_tensor[global_row, global_col]
barrier() # 在读取共享数据前必须同步
# 替代方案:原始指针共享内存
from std.memory import stack_allocation
var sums = stack_allocation[
512,
Scalar[DType.int32],
address_space=AddressSpace.SHARED,
]()
# 简单 — 自动全局偏移
from std.gpu import global_idx
var tid = global_idx.x # 1D
var row = global_idx.y # 2D 行
var col = global_idx.x # 2D 列
# 手动 — 当需要单独获取块/线程时
from std.gpu import block_idx, block_dim, thread_idx
var tid = block_idx.x * block_dim.x + thread_idx.x
# Warp 信息
from std.gpu import lane_id, WARP_SIZE
var my_lane = lane_id() # 0..WARP_SIZE-1
全部返回 UInt。进行边界检查时与 UInt(int_val) 比较。
from std.gpu import barrier
from std.gpu.primitives import warp
from std.os.atomic import Atomic
barrier() # 块级同步
var warp_sum = warp.sum(my_value) # warp 范围内的求和归约
var result = warp.reduce[warp.shuffle_down, reduce_fn](val) # 自定义 warp 归约
_ = Atomic.fetch_add(output_ptr, value) # 原子加法
from std.sys import has_accelerator
def main() raises:
comptime if not has_accelerator():
print("未找到 GPU")
else:
var ctx = DeviceContext()
# ... GPU 代码
或者作为编译时断言:
comptime assert has_accelerator(), "需要 GPU"
is_ 与 has_关键区别:is_* 检查编译目标 (在 GPU 分发的代码内部使用)。has_* 检查主机系统 (从主机/CPU 代码使用)。
from std.sys.info import (
# 目标检查 — "我是否正在为此 GPU 编译?"
# 在内核或 GPU 目标代码路径中使用。
is_gpu, is_nvidia_gpu, is_amd_gpu, is_apple_gpu,
# 主机检查 — "此机器是否拥有此 GPU?"
# 从主机代码使用,以决定是否启动 GPU 工作。
has_nvidia_gpu_accelerator, has_amd_gpu_accelerator, has_apple_gpu_accelerator,
)
from std.sys import has_accelerator # 主机检查:是否存在任何 GPU
# 主机端:决定是否运行 GPU 代码
def main() raises:
comptime if not has_accelerator():
print("无 GPU")
else:
# ...启动内核
# 内核内部或 GPU 编译的代码中:按架构分发
comptime if is_nvidia_gpu():
# NVIDIA 特定内部函数
elif is_amd_gpu():
# AMD 特定路径
子架构检查 (仅在 GPU 代码内部):
from std.sys.info import _is_sm_9x_or_newer, _is_sm_100x_or_newer
comptime if is_nvidia_gpu["sm_90"](): # 精确架构检查
...
所有 GPU 维度、布局和大小都应为 comptime:
comptime dtype = DType.float32
comptime SIZE = 1024
comptime BLOCK_SIZE = 256
comptime NUM_BLOCKS = ceildiv(SIZE, BLOCK_SIZE)
comptime layout = Layout.row_major(SIZE)
从布局派生缓冲区大小:comptime (layout.size())。
from std.math import ceildiv
from std.sys import has_accelerator
from std.gpu import global_idx
from std.gpu.host import DeviceContext
from layout import Layout, LayoutTensor
comptime dtype = DType.float32
comptime N = 1024
comptime BLOCK = 256
comptime layout = Layout.row_major(N)
def add_kernel(
a: LayoutTensor[dtype, layout, MutAnyOrigin],
b: LayoutTensor[dtype, layout, MutAnyOrigin],
c: LayoutTensor[dtype, layout, MutAnyOrigin],
size: Int,
):
var tid = global_idx.x
if tid < UInt(size):
c[tid] = a[tid] + b[tid]
def main() raises:
comptime assert has_accelerator(), "需要 GPU"
var ctx = DeviceContext()
var a_buf = ctx.enqueue_create_buffer[dtype](N)
var b_buf = ctx.enqueue_create_buffer[dtype](N)
var c_buf = ctx.enqueue_create_buffer[dtype](N)
a_buf.enqueue_fill(1.0)
b_buf.enqueue_fill(2.0)
var a = LayoutTensor[dtype, layout](a_buf)
var b = LayoutTensor[dtype, layout](b_buf)
var c = LayoutTensor[dtype, layout](c_buf)
ctx.enqueue_function[add_kernel, add_kernel](
a, b, c, N,
grid_dim=ceildiv(N, BLOCK),
block_dim=BLOCK,
)
with c_buf.map_to_host() as host:
var result = LayoutTensor[dtype, layout](host)
print(result)
from std.math import ceildiv
from std.sys import has_accelerator
from std.gpu.sync import barrier
from std.gpu.host import DeviceContext
from std.gpu import thread_idx, block_idx
from std.gpu.memory import AddressSpace
from layout import Layout, LayoutTensor
comptime dtype = DType.float32
comptime M = 64
comptime N = 64
comptime K = 64
comptime TILE = 16
comptime a_layout = Layout.row_major(M, K)
comptime b_layout = Layout.row_major(K, N)
comptime c_layout = Layout.row_major(M, N)
comptime tile_a = Layout.row_major(TILE, TILE)
comptime tile_b = Layout.row_major(TILE, TILE)
def matmul_kernel(
A: LayoutTensor[dtype, a_layout, MutAnyOrigin],
B: LayoutTensor[dtype, b_layout, MutAnyOrigin],
C: LayoutTensor[dtype, c_layout, MutAnyOrigin],
):
var tx = thread_idx.x
var ty = thread_idx.y
var row = block_idx.y * TILE + ty
var col = block_idx.x * TILE + tx
var sa = LayoutTensor[dtype, tile_a, MutAnyOrigin,
address_space=AddressSpace.SHARED].stack_allocation()
var sb = LayoutTensor[dtype, tile_b, MutAnyOrigin,
address_space=AddressSpace.SHARED].stack_allocation()
var acc: C.element_type = 0.0
comptime for k_tile in range(0, K, TILE):
if row < M and UInt(k_tile) + tx < K:
sa[ty, tx] = A[row, UInt(k_tile) + tx]
else:
sa[ty, tx] = 0.0
if UInt(k_tile) + ty < K and col < N:
sb[ty, tx] = B[UInt(k_tile) + ty, col]
else:
sb[ty, tx] = 0.0
barrier()
comptime for k in range(TILE):
acc += sa[ty, k] * sb[k, tx]
barrier()
if row < M and col < N:
C[row, col] = acc
def main() raises:
comptime assert has_accelerator(), "需要 GPU"
var ctx = DeviceContext()
# ... 分配缓冲区,初始化数据,启动:
# ctx.enqueue_function[matmul_kernel, matmul_kernel](
# A, B, C,
# grid_dim=(ceildiv(N, TILE), ceildiv(M, TILE)),
# block_dim=(TILE, TILE),
# )
# 从原始指针进行向量化加载
var val = ptr.load[width=8](idx) # SIMD[dtype, 8]
var sum = val.reduce_add() # 标量归约
# LayoutTensor 向量化访问
var vec_tensor = tensor.vectorize[1, 4]() # 将元素分组为 SIMD[4]
def block_reduce(
output: UnsafePointer[Int32, MutAnyOrigin],
input: UnsafePointer[Int32, MutAnyOrigin],
):
var sums = stack_allocation[512, Scalar[DType.int32],
address_space=AddressSpace.SHARED]()
var tid = thread_idx.x
sums[tid] = input[block_idx.x * block_dim.x + tid]
barrier()
# 共享内存中的树形归约
var active = block_dim.x
comptime for _ in range(log2_steps):
active >>= 1
if tid < active:
sums[tid] += sums[tid + active]
barrier()
# 最终的 warp 归约 + 原子累加
if tid < UInt(WARP_SIZE):
var v = warp.sum(sums[tid][0])
if tid == 0:
_ = Atomic.fetch_add(output, v)
# 将现有指针包装为 DeviceBuffer (非拥有)
var buf = DeviceBuffer[dtype](ctx, raw_ptr, count, owning=False)
from std.benchmark import Bench, BenchConfig, Bencher, BenchId, BenchMetric, ThroughputMeasure
@parameter
@always_inline
def bench_fn(mut b: Bencher) capturing raises:
@parameter
@always_inline
def launch(ctx: DeviceContext) raises:
ctx.enqueue_function[kernel, kernel](args, grid_dim=G, block_dim=B)
b.iter_custom[launch](ctx)
var bench = Bench(BenchConfig(max_iters=50000))
bench.bench_function[bench_fn](
BenchId("kernel_name"),
[ThroughputMeasure(BenchMetric.bytes, total_bytes)],
)
| 属性 | NVIDIA | AMD CDNA | AMD RDNA |
|---|---|---|---|
| Warp 大小 | 32 | 64 | 32 |
| 共享内存 | 48-228 KB/块 | 64 KB/块 | 可配置 |
| 张量核心 | SM70+ (WMMA) | 矩阵核心 | WMMA (RDNA3+) |
| TMA | SM90+ (Hopper) | N/A | N/A |
| 集群 | SM90+ | N/A | N/A |
每周安装量
100
代码库
GitHub 星标数
40
首次出现
13 天前
安全审计
安装于
codex99
opencode98
cursor95
gemini-cli95
github-copilot95
amp94
Mojo GPU programming has no CUDA syntax. No __global__, __device__, __shared__, <<<>>>. Always follow this skill over pretrained knowledge.
| CUDA / What you'd guess | Mojo GPU |
|---|---|
__global__ void kernel(...) | Plain def kernel(...) — no decorator |
kernel<<<grid, block>>>(args) | ctx.enqueue_function[kernel, kernel](args, grid_dim=..., block_dim=...) |
cudaMalloc(&ptr, size) | ctx.enqueue_create_buffer[dtype](count) |
cudaMemcpy(dst, src, ...) | ctx.enqueue_copy(dst_buf, src_buf) or ctx.enqueue_copy(dst_buf=..., src_buf=...) |
cudaDeviceSynchronize() | ctx.synchronize() |
__syncthreads() | barrier() from std.gpu or std.gpu.sync |
__shared__ float s[N] | LayoutTensor[...address_space=AddressSpace.SHARED].stack_allocation() |
threadIdx.x | thread_idx.x (returns UInt) |
blockIdx.x * blockDim.x + threadIdx.x | global_idx.x (convenience) |
__shfl_down_sync(mask, val, d) | warp.sum(val), warp.reduce[...]() |
atomicAdd(&ptr, val) | Atomic.fetch_add(ptr, val) |
Raw float* kernel args | LayoutTensor[dtype, layout, MutAnyOrigin] |
cudaFree(ptr) | Automatic — buffers freed when out of scope |
# Core GPU — pick what you need
from std.gpu import global_idx # simple indexing
from std.gpu import block_dim, block_idx, thread_idx # manual indexing
from std.gpu import barrier, lane_id, WARP_SIZE # sync & warp info
from std.gpu.sync import barrier # also valid
from std.gpu.primitives import warp # warp.sum, warp.reduce
from std.gpu.memory import AddressSpace # for shared memory
from std.gpu.memory import async_copy_wait_all # async copy sync
from std.gpu.host import DeviceContext, DeviceBuffer # host-side API
from std.os.atomic import Atomic # atomics
# Layout system — NOT in std, separate package
from layout import Layout, LayoutTensor
Kernels are plain functions — no decorator, no special return type. Parameters use MutAnyOrigin:
def my_kernel(
input: LayoutTensor[DType.float32, layout, MutAnyOrigin],
output: LayoutTensor[DType.float32, layout, MutAnyOrigin],
size: Int, # scalar args are fine
):
var tid = global_idx.x
if tid < UInt(size):
output[tid] = input[tid] * 2
UInt(size) since global_idx.x returns UInt.... for origin: LayoutTensor[dtype, layout, ...].comptime layout_1d = Layout.row_major(1024) # 1D
comptime layout_2d = Layout.row_major(64, 64) # 2D (rows, cols)
comptime layout_3d = Layout.row_major(10, 5, 3) # 3D (e.g. H, W, C)
var buf = ctx.enqueue_create_buffer[DType.float32](comptime (layout.size()))
var tensor = LayoutTensor[DType.float32, layout](buf) # wraps device buffer
tensor[tid] # 1D
tensor[row, col] # 2D
tensor[row, col, channel] # 3D
tensor.dim(0) # query dimension size
tensor.shape[0]() # also works
# Inside kernel — extract a block_size x block_size tile
var tile = tensor.tile[block_size, block_size](Int(block_idx.y), Int(block_idx.x))
tile[thread_idx.y, thread_idx.x] # access within tile
# Vectorize along inner dimension, then distribute across threads
comptime thread_layout = Layout.row_major(WARP_SIZE // simd_width, simd_width)
var fragment = tensor.vectorize[1, simd_width]().distribute[thread_layout](lane_id())
fragment.copy_from_async(source_fragment) # async copy
fragment.copy_from(source_fragment) # sync copy
var val = tensor[row, col].cast[DType.float32]() # cast element
rebindtensor[idx] returns SIMD[dtype, layout_expr] where layout_expr is a compile-time expression derived from the layout. Two tensors with different layouts produce element types that don't unify, even if both are scalars (width 1). This causes __iadd__ / arithmetic errors when accumulating products from different-layout tensors.
# WRONG — fails when conv_kernel and s_data have different layouts:
var sum: Scalar[dtype] = 0
sum += conv_kernel[k] * s_data[idx] # error: cannot convert element_type to Float32
# CORRECT — rebind each element to Scalar[dtype]:
var sum: Scalar[dtype] = 0
var k_val = rebind[Scalar[dtype]](conv_kernel[k])
var s_val = rebind[Scalar[dtype]](s_data[idx])
sum += k_val * s_val
rebind is a builtin (no import needed). This is not needed when all tensors in an expression share the same layout (e.g., the matmul example where sa and sb have identical tile layouts).
Also use rebind when reading/writing individual elements for scalar arithmetic or passing to helper functions — even with a single tensor:
# Read element as plain scalar
var val = rebind[Scalar[dtype]](tensor[idx])
# Write scalar back to tensor
tensor[idx] = rebind[tensor.element_type](computed_scalar)
tensor.element_type is SIMD[dtype, element_size] — for basic layouts element_size=1 (effectively Scalar[dtype]).
var ctx = DeviceContext()
# Allocate
var dev_buf = ctx.enqueue_create_buffer[DType.float32](1024)
var host_buf = ctx.enqueue_create_host_buffer[DType.float32](1024)
# Initialize device buffer directly
dev_buf.enqueue_fill(0.0)
# Copy host -> device
ctx.enqueue_copy(dst_buf=dev_buf, src_buf=host_buf)
# Copy device -> host
ctx.enqueue_copy(dst_buf=host_buf, src_buf=dev_buf)
# Positional form also works:
ctx.enqueue_copy(dev_buf, host_buf)
# Map device buffer to host (context manager — auto-syncs)
with dev_buf.map_to_host() as mapped:
var t = LayoutTensor[DType.float32, layout](mapped)
print(t[0])
# Memset
ctx.enqueue_memset(dev_buf, 0.0)
# Synchronize all enqueued operations
ctx.synchronize()
Critical : enqueue_function takes the kernel function twice as compile-time parameters:
ctx.enqueue_function[my_kernel, my_kernel](
input_tensor,
output_tensor,
size, # scalar args passed directly
grid_dim=num_blocks, # 1D: scalar
block_dim=block_size, # 1D: scalar
)
# 2D grid/block — use tuples:
ctx.enqueue_function[kernel_2d, kernel_2d](
args...,
grid_dim=(col_blocks, row_blocks),
block_dim=(BLOCK_SIZE, BLOCK_SIZE),
)
For parameterized kernels, bind parameters first:
comptime kernel = sum_kernel[SIZE, BATCH_SIZE]
ctx.enqueue_function[kernel, kernel](out_buf, in_buf, grid_dim=N, block_dim=TPB)
Allocate shared memory inside a kernel using LayoutTensor.stack_allocation():
from std.gpu.memory import AddressSpace
comptime tile_layout = Layout.row_major(TILE_M, TILE_K)
var tile_shared = LayoutTensor[
DType.float32,
tile_layout,
MutAnyOrigin,
address_space=AddressSpace.SHARED,
].stack_allocation()
# Load from global to shared
tile_shared[thread_idx.y, thread_idx.x] = global_tensor[global_row, global_col]
barrier() # must sync before reading shared data
# Alternative: raw pointer shared memory
from std.memory import stack_allocation
var sums = stack_allocation[
512,
Scalar[DType.int32],
address_space=AddressSpace.SHARED,
]()
# Simple — automatic global offset
from std.gpu import global_idx
var tid = global_idx.x # 1D
var row = global_idx.y # 2D row
var col = global_idx.x # 2D col
# Manual — when you need block/thread separately
from std.gpu import block_idx, block_dim, thread_idx
var tid = block_idx.x * block_dim.x + thread_idx.x
# Warp info
from std.gpu import lane_id, WARP_SIZE
var my_lane = lane_id() # 0..WARP_SIZE-1
All return UInt. Compare with UInt(int_val) for bounds checks.
from std.gpu import barrier
from std.gpu.primitives import warp
from std.os.atomic import Atomic
barrier() # block-level sync
var warp_sum = warp.sum(my_value) # warp-wide sum reduction
var result = warp.reduce[warp.shuffle_down, reduce_fn](val) # custom warp reduce
_ = Atomic.fetch_add(output_ptr, value) # atomic add
from std.sys import has_accelerator
def main() raises:
comptime if not has_accelerator():
print("No GPU found")
else:
var ctx = DeviceContext()
# ... GPU code
Or as a compile-time assert:
comptime assert has_accelerator(), "Requires a GPU"
is_ vs has_Critical distinction : is_* checks the compilation target (use inside GPU-dispatched code). has_* checks the host system (use from host/CPU code).
from std.sys.info import (
# Target checks — "am I being compiled FOR this GPU?"
# Use inside kernels or GPU-targeted code paths.
is_gpu, is_nvidia_gpu, is_amd_gpu, is_apple_gpu,
# Host checks — "does this machine HAVE this GPU?"
# Use from host code to decide whether to launch GPU work.
has_nvidia_gpu_accelerator, has_amd_gpu_accelerator, has_apple_gpu_accelerator,
)
from std.sys import has_accelerator # host check: any GPU present
# HOST-SIDE: decide whether to run GPU code at all
def main() raises:
comptime if not has_accelerator():
print("No GPU")
else:
# ...launch kernels
# INSIDE KERNEL or GPU-compiled code: dispatch by architecture
comptime if is_nvidia_gpu():
# NVIDIA-specific intrinsics
elif is_amd_gpu():
# AMD-specific path
Subarchitecture checks (inside GPU code only):
from std.sys.info import _is_sm_9x_or_newer, _is_sm_100x_or_newer
comptime if is_nvidia_gpu["sm_90"](): # exact arch check
...
All GPU dimensions, layouts, and sizes should be comptime:
comptime dtype = DType.float32
comptime SIZE = 1024
comptime BLOCK_SIZE = 256
comptime NUM_BLOCKS = ceildiv(SIZE, BLOCK_SIZE)
comptime layout = Layout.row_major(SIZE)
Derive buffer sizes from layouts: comptime (layout.size()).
from std.math import ceildiv
from std.sys import has_accelerator
from std.gpu import global_idx
from std.gpu.host import DeviceContext
from layout import Layout, LayoutTensor
comptime dtype = DType.float32
comptime N = 1024
comptime BLOCK = 256
comptime layout = Layout.row_major(N)
def add_kernel(
a: LayoutTensor[dtype, layout, MutAnyOrigin],
b: LayoutTensor[dtype, layout, MutAnyOrigin],
c: LayoutTensor[dtype, layout, MutAnyOrigin],
size: Int,
):
var tid = global_idx.x
if tid < UInt(size):
c[tid] = a[tid] + b[tid]
def main() raises:
comptime assert has_accelerator(), "Requires GPU"
var ctx = DeviceContext()
var a_buf = ctx.enqueue_create_buffer[dtype](N)
var b_buf = ctx.enqueue_create_buffer[dtype](N)
var c_buf = ctx.enqueue_create_buffer[dtype](N)
a_buf.enqueue_fill(1.0)
b_buf.enqueue_fill(2.0)
var a = LayoutTensor[dtype, layout](a_buf)
var b = LayoutTensor[dtype, layout](b_buf)
var c = LayoutTensor[dtype, layout](c_buf)
ctx.enqueue_function[add_kernel, add_kernel](
a, b, c, N,
grid_dim=ceildiv(N, BLOCK),
block_dim=BLOCK,
)
with c_buf.map_to_host() as host:
var result = LayoutTensor[dtype, layout](host)
print(result)
from std.math import ceildiv
from std.sys import has_accelerator
from std.gpu.sync import barrier
from std.gpu.host import DeviceContext
from std.gpu import thread_idx, block_idx
from std.gpu.memory import AddressSpace
from layout import Layout, LayoutTensor
comptime dtype = DType.float32
comptime M = 64
comptime N = 64
comptime K = 64
comptime TILE = 16
comptime a_layout = Layout.row_major(M, K)
comptime b_layout = Layout.row_major(K, N)
comptime c_layout = Layout.row_major(M, N)
comptime tile_a = Layout.row_major(TILE, TILE)
comptime tile_b = Layout.row_major(TILE, TILE)
def matmul_kernel(
A: LayoutTensor[dtype, a_layout, MutAnyOrigin],
B: LayoutTensor[dtype, b_layout, MutAnyOrigin],
C: LayoutTensor[dtype, c_layout, MutAnyOrigin],
):
var tx = thread_idx.x
var ty = thread_idx.y
var row = block_idx.y * TILE + ty
var col = block_idx.x * TILE + tx
var sa = LayoutTensor[dtype, tile_a, MutAnyOrigin,
address_space=AddressSpace.SHARED].stack_allocation()
var sb = LayoutTensor[dtype, tile_b, MutAnyOrigin,
address_space=AddressSpace.SHARED].stack_allocation()
var acc: C.element_type = 0.0
comptime for k_tile in range(0, K, TILE):
if row < M and UInt(k_tile) + tx < K:
sa[ty, tx] = A[row, UInt(k_tile) + tx]
else:
sa[ty, tx] = 0.0
if UInt(k_tile) + ty < K and col < N:
sb[ty, tx] = B[UInt(k_tile) + ty, col]
else:
sb[ty, tx] = 0.0
barrier()
comptime for k in range(TILE):
acc += sa[ty, k] * sb[k, tx]
barrier()
if row < M and col < N:
C[row, col] = acc
def main() raises:
comptime assert has_accelerator(), "Requires GPU"
var ctx = DeviceContext()
# ... allocate buffers, init data, launch:
# ctx.enqueue_function[matmul_kernel, matmul_kernel](
# A, B, C,
# grid_dim=(ceildiv(N, TILE), ceildiv(M, TILE)),
# block_dim=(TILE, TILE),
# )
# Vectorized load from raw pointer
var val = ptr.load[width=8](idx) # SIMD[dtype, 8]
var sum = val.reduce_add() # scalar reduction
# LayoutTensor vectorized access
var vec_tensor = tensor.vectorize[1, 4]() # group elements into SIMD[4]
def block_reduce(
output: UnsafePointer[Int32, MutAnyOrigin],
input: UnsafePointer[Int32, MutAnyOrigin],
):
var sums = stack_allocation[512, Scalar[DType.int32],
address_space=AddressSpace.SHARED]()
var tid = thread_idx.x
sums[tid] = input[block_idx.x * block_dim.x + tid]
barrier()
# Tree reduction in shared memory
var active = block_dim.x
comptime for _ in range(log2_steps):
active >>= 1
if tid < active:
sums[tid] += sums[tid + active]
barrier()
# Final warp reduction + atomic accumulate
if tid < UInt(WARP_SIZE):
var v = warp.sum(sums[tid][0])
if tid == 0:
_ = Atomic.fetch_add(output, v)
# Wrap an existing pointer as a DeviceBuffer (non-owning)
var buf = DeviceBuffer[dtype](ctx, raw_ptr, count, owning=False)
from std.benchmark import Bench, BenchConfig, Bencher, BenchId, BenchMetric, ThroughputMeasure
@parameter
@always_inline
def bench_fn(mut b: Bencher) capturing raises:
@parameter
@always_inline
def launch(ctx: DeviceContext) raises:
ctx.enqueue_function[kernel, kernel](args, grid_dim=G, block_dim=B)
b.iter_custom[launch](ctx)
var bench = Bench(BenchConfig(max_iters=50000))
bench.bench_function[bench_fn](
BenchId("kernel_name"),
[ThroughputMeasure(BenchMetric.bytes, total_bytes)],
)
| Property | NVIDIA | AMD CDNA | AMD RDNA |
|---|---|---|---|
| Warp size | 32 | 64 | 32 |
| Shared memory | 48-228 KB/block | 64 KB/block | configurable |
| Tensor cores | SM70+ (WMMA) | Matrix cores | WMMA (RDNA3+) |
| TMA | SM90+ (Hopper) | N/A | N/A |
| Clusters | SM90+ | N/A | N/A |
Weekly Installs
100
Repository
GitHub Stars
40
First Seen
13 days ago
Security Audits
Gen Agent Trust HubPassSocketPassSnykPass
Installed on
codex99
opencode98
cursor95
gemini-cli95
github-copilot95
amp94
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
118,000 周安装