Pipeline 2: Attach a FIFO to H2D
Contents
Pipeline 2: Attach a FIFO to H2D¶
The previous example stalls if the parameter size
exceeds the capacity of
the internal queues. The size of the queue is architecture-dependent. From the
software development point of view, a program should be independent of any
architecture. One solution is to add a FIFO between H2D and @add16
. The FIFO
receives data from H2D and then forwards the data to @add16
. The WSE
provides an efficient design for FIFO. The user just binds two microthreads to
the FIFO: one pushes data into the FIFO, and the other pops the data out. As
long as the parameter size
does not exceed the capacity of the FIFO, H2D can
always push all data into the FIFO even if @add16
cannot process any data.
Once H2D is done, D2H can continue to drain the data out such that @add16
can progress.
To create a FIFO, we use a builtin @allocate_fifo
to bind a normal tensor.
We create two fabric DSDs: one pushes data from MEMCPYH2D_DATA_1
to the
FIFO and the other pops data from the FIFO to the color C1
. Both DSDs must
use different microthreads.
The routing configuration of color C1
is RAMP to RAMP because
1) the FIFO pops data to the router via C1
and
2) @add16
receives data from the router via C1
We also block the color C1
because @add16
receives data via microthread
3.
The disadvantage of this approach is the resource consumption. The FIFO requires two microthreads and a scratch buffer.
The next example will fix this issue.
layout.csl¶
// resources to route the data between the host and the device.
//
// color/ task ID map
//
// ID var ID var ID var ID var
// 0 H2D 9 C1 18 27 reserved (memcpy)
// 1 D2H 10 19 28 reserved (memcpy)
// 2 11 20 29 reserved
// 3 12 21 reserved (memcpy) 30 reserved (memcpy)
// 4 13 22 reserved (memcpy) 31 reserved
// 5 14 23 reserved (memcpy) 32
// 6 15 24 33
// 7 16 25 34
// 8 main 17 26 35
//
param size: i16;
param MEMCPYH2D_DATA_1_ID: i16;
param MEMCPYD2H_DATA_1_ID: i16;
const MEMCPYH2D_DATA_1: color = @get_color(MEMCPYH2D_DATA_1_ID);
const MEMCPYD2H_DATA_1: color = @get_color(MEMCPYD2H_DATA_1_ID);
const main: u16 = 8;
const C1: color = @get_color(9);
const memcpy = @import_module( "<memcpy/get_params>", .{
.width = 1,
.height = 1,
.MEMCPYH2D_1 = MEMCPYH2D_DATA_1,
.MEMCPYD2H_1 = MEMCPYD2H_DATA_1
});
layout {
@set_rectangle(1, 1);
@set_tile_code(0, 0, "pe_program.csl", .{
.size = size,
.main = main,
.C1 = C1,
.memcpy_params = memcpy.get_params(0)
});
}
pe_program.csl¶
// Not a complete program; the top-level source file is code.csl.
//
// Introduce a fifo to buffer the data from H2D, so the H2D can
// finish as long as size does not exceed the capacity of the fifo
//
// H2D --> fifo --> C1 --> addh() --> D2H
//
param size: i16;
param main: u16;
param C1: color;
param memcpy_params: comptime_struct;
const main_task_id: local_task_id = @get_local_task_id(main);
const sys_mod = @import_module( "<memcpy/memcpy>", memcpy_params);
var fifo_buffer = @zeros([1024]i16);
const fifo = @allocate_fifo(fifo_buffer);
const infinite_length: u16 = 0x7fff;
const in_dsd = @get_dsd(fabin_dsd, .{
.extent = infinite_length,
.fabric_color = sys_mod.MEMCPYH2D_1,
.input_queue = @get_input_queue(2)});
const out_dsd = @get_dsd(fabout_dsd, .{
.extent = infinite_length,
.fabric_color = C1,
.output_queue = @get_output_queue(3)});
const inDsd = @get_dsd(fabin_dsd, .{
.extent = size,
.fabric_color = C1,
.input_queue = @get_input_queue(1),
});
const outDsd = @get_dsd(fabout_dsd, .{
.extent = size,
.fabric_color = sys_mod.MEMCPYD2H_1,
.output_queue = @get_output_queue(1)
});
var buf = @zeros([1]i16);
const one_dsd = @get_dsd(mem1d_dsd, .{ .tensor_access = |i|{size} -> buf[0] });
task mainTask() void {
// Move from the fabric to the FIFO
@mov16(fifo, in_dsd, .{.async = true});
// Move from the FIFO to C1
@mov16(out_dsd, fifo, .{.async = true});
buf[0] = @as(i16, 1);
@add16(outDsd, inDsd, one_dsd, .{.async=true});
}
comptime {
// activate local task mainTask at startup
@bind_local_task(mainTask, main_task_id);
@activate(main_task_id);
// fifo sends out the data via C1 --> tx = RAMP
// add16 receives data via C1 --> rx = RAMP
@set_local_color_config(C1, .{.routes = .{.rx = .{RAMP}, .tx = .{RAMP}}});
}
run.py¶
#!/usr/bin/env cs_python
import argparse
import json
import numpy as np
from cerebras.sdk.sdk_utils import memcpy_view, input_array_to_u32
from cerebras.sdk.runtime.sdkruntimepybind import SdkRuntime, MemcpyDataType # pylint: disable=no-name-in-module
from cerebras.sdk.runtime.sdkruntimepybind import MemcpyOrder # pylint: disable=no-name-in-module
parser = argparse.ArgumentParser()
parser.add_argument('--name', help='the test name')
parser.add_argument("--cmaddr", help="IP:port for CS system")
args = parser.parse_args()
dirname = args.name
# Parse the compile metadata
with open(f"{dirname}/out.json", encoding="utf-8") as json_file:
compile_data = json.load(json_file)
params = compile_data["params"]
MEMCPYH2D_DATA_1 = int(params["MEMCPYH2D_DATA_1_ID"])
MEMCPYD2H_DATA_1 = int(params["MEMCPYD2H_DATA_1_ID"])
# Size of the input and output tensors; use this value when compiling the
# program, e.g. `cslc --params=size:12 --fabric-dims=8,3 --fabric-offsets=4,1`
size = int(params["size"])
print(f"MEMCPYH2D_DATA_1 = {MEMCPYH2D_DATA_1}")
print(f"MEMCPYD2H_DATA_1 = {MEMCPYD2H_DATA_1}")
print(f"size = {size}")
memcpy_dtype = MemcpyDataType.MEMCPY_16BIT
runner = SdkRuntime(dirname, cmaddr=args.cmaddr)
runner.load()
runner.run()
# Generate a random input tensor of the desired size
input_tensor = np.random.randint(256, size=size, dtype=np.int16)
print("step 1: streaming H2D")
# "input_tensor" is a 1d array
# The type of input_tensor is int16, we need to extend it to uint32
# There are two kind of extension when using the utility function input_array_to_u32
# input_array_to_u32(np_arr: np.ndarray, sentinel: Optional[int], fast_dim_sz: int)
# 1) zero extension:
# sentinel = None
# 2) upper 16-bit is the index of the array:
# sentinel is Not None
#
# In this example, the upper 16-bit is don't care because pe_program.csl only uses
# @add16 to reads lower 16-bit
tensors_u32 = input_array_to_u32(input_tensor, 1, size)
runner.memcpy_h2d(MEMCPYH2D_DATA_1, tensors_u32, 0, 0, 1, 1, size, \
streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=True)
print("step 2: streaming D2H")
# The D2H buffer must be of type u32
out_tensors_u32 = np.zeros(size, np.uint32)
runner.memcpy_d2h(out_tensors_u32, MEMCPYD2H_DATA_1, 0, 0, 1, 1, size, \
streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=False)
# remove upper 16-bit of each u32
result_tensor = memcpy_view(out_tensors_u32, np.dtype(np.int16))
runner.stop()
np.testing.assert_equal(result_tensor, input_tensor + 1)
print("SUCCESS!")
commands.sh¶
#!/usr/bin/env bash
set -e
cslc ./layout.csl --fabric-dims=8,3 \
--fabric-offsets=4,1 --params=size:32 -o out \
--params=MEMCPYH2D_DATA_1_ID:0 \
--params=MEMCPYD2H_DATA_1_ID:1 \
--memcpy --channels=1 --width-west-buf=0 --width-east-buf=0
cs_python run.py --name out