Pipelines and Higher-Order Functions

· updated 2026-05-16

CLEAR's pipeline system lets you transform, filter, aggregate, and iterate collections using the smooth operator (|>).

Every pipeline operator works on arrays, @list, @pool, sharded collections, @pool:soa, streams ~T[], etc - the same syntax regardless of the underlying storage.

_ is the placeholder value for the value being iterated over.

scores
  |> WHERE _ <= 50 
  |> SUM _;

users 
  |> SELECT _.name 
  |> DISTINCT _;
  
entities 
  |> EACH { _.health = _.health - 1.0; };

This document describes both the collection pipeline model and the stream/future pipeline surface. The full operator set is supported for finite streams (~T[], ~T[N]); infinite streams (~T[INF]) require LIMIT to bound them and then support all non-materialization operators.

The Smooth Operator (|>)

|> pipes a value into a function or operator. It's CLEAR's equivalent of |> (Elixir) or . method chaining (Ruby), but it also works with collection operators.

# Pipe to a function: x |> f  →  f(x)
result = data 
  |> process 
  |> validate
  |> format;

# Pipe to an operator: list |> WHERE predicate
alive = entities 
  |> WHERE _.health > 0;

Pipelines chain left to right. Each stage passes its result to the next.

Named Pipeline Bindings (AS $v)

The AS $v syntax binds the current pipeline element to a named reference that persists across subsequent stages.

bill = users AS $u
  |> UNNEST $u.orders
  |> SUM _.price * $u.discount;

Why bindings exist

Without a binding, _ always refers to the current element - the item being iterated at the innermost level. After UNNEST, _ becomes each inner element (an Order), and the outer element (the User) is no longer reachable.

# Without AS $u: $u is not available inside the fold.
# `_` after UNNEST is the Order, not the User.
bill = users
  |> UNNEST _.orders
  |> SUM _.price;         # can access order.price, but NOT user.discount

AS $u captures the outer element before the UNNEST replaces _, keeping it accessible:

bill = users AS $u
  |> UNNEST $u.orders     # $u = the User; _ = each Order
  |> SUM _.price * $u.discount;  # cross-reference: order.price * user.discount

The compiler fuses this into a single nested loop with no intermediate allocations.

Scope of a binding

A binding created with AS $v is visible from the point of declaration to the end of the pipeline expression. It cannot be used after the pipeline terminates:

bill = users AS $u
  |> UNNEST $u.orders   # $u is in scope
  |> WHERE _.qty > 1    # $u still in scope
  |> SUM _.price * $u.discount;  # $u still in scope

# $u is not accessible here (out of scope after the pipeline ends)

Multiple pipelines in the same function can each use their own $u - bindings are scoped to the pipeline expression, not the function.

Naming the inner element (AS $o)

When you UNNEST and need a name for the inner element (instead of _), use a second binding after the UNNEST expression:

bill = users AS $u
  |> UNNEST $u.orders AS $o    # $u = User, $o = Order
  |> SUM $o.price * $u.discount;

AS $o after the UNNEST expression binds the inner element. Both $u and $o are available in the fold.

Supported combinations

UNNEST binding chains - fold operators that work after AS $u |> UNNEST:

FoldExample
SUM|> SUM _.price * $u.discount
COUNT|> COUNT TRUE
AVERAGE|> AVERAGE _.price
MIN|> MIN _.price
MAX|> MAX _.price
ANY|> ANY _.price > 50.0
ALL|> ALL $u.discount > 0.0
FIND|> FIND _.price > 10.0 (returns ?ElemType)

Intermediate WHERE stages filter the inner elements before the fold:

total = users AS $u
  |> UNNEST $u.orders
  |> WHERE _.qty > 1        # filter inner elements
  |> SUM _.price * $u.discount;

SELECT is not supported in UNNEST binding chains (the inner projection would lose the $u context). Use field access in the fold expression instead.

CONCURRENT binding - $u is also accessible inside CONCURRENT stages that operate directly on the bound list (without an intermediate UNNEST):

OperatorExample
CONCURRENT SELECTAS $u |> CONCURRENT SELECT $u.val * 2.0
CONCURRENT SUMAS $u |> CONCURRENT SUM $u.score
CONCURRENT COUNTAS $u |> CONCURRENT COUNT $u.active
CONCURRENT MINAS $u |> CONCURRENT MIN $u.score
CONCURRENT MAXAS $u |> CONCURRENT MAX $u.score
CONCURRENT AVERAGEAS $u |> CONCURRENT AVERAGE $u.score
CONCURRENT WHEREAS $u |> CONCURRENT WHERE $u.score > 50.0

In all concurrent cases, $u resolves to the item being processed by the current worker.

The _ Variable

Inside pipeline expressions, _ refers to the current element. For struct elements, access fields with _.fieldname:

# _ is the element itself (for scalar collections)
nums: Float64[] = [1.0, 3.0, 7.0, 9.0];

big = nums 
  |> WHERE _ > 5.0;

ASSERT length(big) == 2, "WHERE filters by element value";

# _.field for struct collections
users = [User{name: "alice"}, User{name: "bob"}];

names = users
  |> SELECT _.name;

scores = [Score{value: 10.0}, Score{value: 20.0}];

total = scores 
  |> SUM _.value;

ASSERT total == 30.0, "SUM aggregates field values";

In EACH blocks, _ is mutable — you can assign to fields:

pool 
  |> EACH { _.health = _.health - damage; };

Operators

Transform

OperatorSyntaxReturnsDescription
SELECTlist |> SELECT exprExprType[]Project each element through an expression
WHERElist |> WHERE predElemType[]Keep elements matching a boolean predicate
ORDER_BYlist |> ORDER_BY keyElemType[]Sort by key expression
LIMITlist |> LIMIT nElemType[]First N elements
SKIPlist |> SKIP nElemType[]Drop first N elements, return rest
DISTINCTlist |> DISTINCT keyElemType[]Unique by key (first occurrence wins)
UNNESTlist |> UNNEST exprInnerType[]Flatten nested arrays (flatmap)
INDEXlist |> INDEX keyHashMap<ElemType[]>Group into a hashmap by key

SELECT accepts any expression, including struct literals. This lets you project into a different struct type in one step:

STRUCT Raw     { id: Int64, score: Float64 }
STRUCT Summary { key: Int64, normalized: Float64 }

raws: Raw[] = [Raw{ id: 1, score: 100.0 }, Raw{ id: 2, score: 200.0 }];

summaries = raws |> SELECT Summary{ key: _.id, normalized: _.score / 100.0 };

ASSERT summaries[0].key == 1, "id preserved";
ASSERT summaries[1].normalized == 2.0, "score normalized";

The result type is inferred from the expression - Summary[] above, not Raw[].

Pipeline fusion with SELECT T{}: SELECT composes with WHERE and aggregates in a single fused loop - no intermediate list allocation:

# WHERE before SELECT: filter on the raw element (efficient)
high = raws
  |> WHERE _.score > 75.0
  |> SELECT Summary{ key: _.id, normalized: _.score / 100.0 };

# SELECT before aggregate: project then sum/min/max a field
total = raws
  |> SELECT Summary{ key: _.id, normalized: _.score / 100.0 }
  |> SUM _.normalized;

# SELECT before WHERE: build struct first, then filter on a struct field (less efficient)
norm_high = raws
  |> SELECT Summary{ key: _.id, normalized: _.score / 100.0 }
  |> WHERE _.normalized > 0.75;

When SELECT precedes a fold or WHERE, the compiler binds the projected value to a temp variable per iteration. This avoids the Zig restriction on struct literals in arithmetic/boolean expression positions, so the generated code is always valid without changing semantics.

Aggregate

OperatorSyntaxReturnsEmpty list
SUMlist |> SUM exprInt64 / UInt64 / Float32 / Float640
AVERAGElist |> AVERAGE exprFloat640
MINlist |> MIN exprmatches expr typepanics
MAXlist |> MAX exprmatches expr typepanics
REDUCElist |> REDUCE(init) exprtype of initinit

The return type is driven by the expression type. SUM widens small integers to Int64/UInt64; floats stay at their original width (Float32 stays Float32). AVERAGE always returns Float64. MIN and MAX preserve the exact expression type. REDUCE is the general fold - acc is the mutable accumulator, _ is the current element:

nums: Float64[] = [2.0, 3.0, 4.0];

product = nums 
  |> REDUCE(1.0) acc * _;

ASSERT product == 24.0, "REDUCE multiplies 2*3*4";

Query

OperatorSyntaxReturnsDescription
COUNTlist |> COUNT predInt64Number of matches
ANYlist |> ANY predBoolTrue if any match (short-circuits)
ALLlist |> ALL predBoolTrue if all match (short-circuits)
FINDlist |> FIND pred?ElemTypeFirst match or null

Side Effects

OperatorSyntaxReturnsDescription
EACHlist |> EACH { body }VoidIterate with mutable _; side-effect only
TAPlist |> TAP { body }ElemType[]Observe each element (read-only _), pass collection through

EACH is the only operator where _ is mutable. Use it for in-place updates:

entities 
  |> EACH { _.x = _.x + _.vx; _.y = _.y + _.vy; };

TAP (Debugging / Observation)

TAP runs a body for each element but passes the collection through unchanged. Unlike EACH, _ is read-only and TAP returns the original collection:

result = scores
    |> WHERE _.points > 100
    |> TAP { print("score: ${_.points.toString()}"); }
    |> SUM _.points;

WINDOW (Sliding Window)

WINDOW produces a sliding window of size N over the collection. _ inside the body is the sub-slice (a Float64[] or ElemType[] of length N). The result is an array of the body expression evaluated per window.

data: Float64[] = [1.0, 2.0, 3.0, 4.0, 5.0];

# Each window is a 3-element slice; body projects to window length
lengths = data |> WINDOW(3) _.length();
ASSERT lengths.length() == 3, "5 elements, window 3 -> 3 windows";

# Access individual elements in the window
firsts = data |> WINDOW(2) _[0];
ASSERT firsts[0] == 1.0, "first element of first window";

Number of result windows = max(0, len - size + 1). A window larger than the list produces an empty result.

WINDOW (Batch / Tumbling Window)

WINDOW(size: N), WINDOW(time: 'Xms'), or WINDOW(size: N, time: 'Xms') produces non-overlapping batches. _ inside the body is a T[] batch. The result is a heap-allocated list of the body expression evaluated per batch.

Flush conditions (checked after every item):

A partial batch at the end is always included. Works on all source types: arrays, ~T[], ~T[N], and ~T[INF].

FN sumBatch(batch: Int64[]) RETURNS Int64 ->
    MUTABLE s: Int64 = 0;
    batch |> EACH { s = s + _; };
    RETURN s;
END

# Array source, size-only: [1..7] -> [1,2,3], [4,5,6], [7]
arr: Int64[] = [1, 2, 3, 4, 5, 6, 7];
sums = arr |> WINDOW(size: 3) sumBatch(_);
ASSERT sums.length() == 3;   # 3 batches
ASSERT sums[2] == 7;         # partial final batch

# Open stream, size + time (first-of-either)
gen: ~?Int64[] = BG STREAM { MUTABLE i: Int64 = 1; WHILE i <= 10 DO YIELD i; i = i + 1; END };
sums2 = gen |> WINDOW(size: 4, time: "500ms") sumBatch(_);
ASSERT sums2.length() == 3;  # [1-4], [5-8], [9-10]

# Time-only: entire array arrives fast, all items in one final batch
arr2: Int64[] = [100, 200, 300];
lens = arr2 |> WINDOW(time: "1s") _.length();
ASSERT lens[0] == 3;  # one batch of 3

Note: The sliding-window form WINDOW(N) (positional integer, no named params) is the existing collection-only operator above. The named-param form WINDOW(size:, time:) is the new batching operator and works on both collections and streams.

JOIN (Left Outer Join)

JOIN performs a left outer join between two collections using a two-parameter lambda predicate. Every element of the left collection appears in the result exactly once; the right field is NIL when no right element matches.

STRUCT User { id: Int64, name: String }
STRUCT Order { userId: Int64, amount: Float64 }

results = users |> JOIN(orders) %(u, o) -> u.id == o.userId;
# results: JoinResult_User_Order[]
# results[i].left  -- the User
# results[i].right -- ?Order (NIL if no match)

The result element type is an anonymous struct { left: L, right: ?R }. The lambda must take exactly two parameters (left element, right element) and return Bool.

Stream and Future Compatibility

Pipelines over futures/streams are currently supported in a narrower subset than pipelines over collections.

Stream kinds

TypeMeaningNEXT resultPipeline support
~TSingle future valueTNot a pipeline source
~?T[]Open stream?TNot a pipeline source yet
~T[]Finite dynamic stream?TFull non-concurrent operator set (see table)
~T[N]Finite bounded stream?TFull non-concurrent operator set plus CONCURRENT EACH/SELECT/WHERE
~T[INF]Infinite streamTFusible stages + all terminals via LIMIT

Operator support matrix

The columns are the three pipeline-capable stream types. "Stage" operators filter or transform in a fused while loop without materializing; "terminal" operators consume the stream and produce a scalar or collection result.

Stages (fusible, zero intermediate allocations)

Operator~T[]~T[N]~T[INF]
WHEREyesyesyes (LIMIT must appear in chain)
SELECTyesyesyes (LIMIT must appear in chain)
SKIPyesyesyes (LIMIT must appear in chain)
TAKE_WHILEyesyesyes (LIMIT must appear in chain)
LIMITyesyesyes - converts stream to T[]
TAPyesyesnot yet

LIMIT can appear anywhere in the chain relative to other fusible stages. The compiler fuses the entire chain into a single while loop - counter |> WHERE _ > 0 |> LIMIT 5 |> EACH and counter |> LIMIT 5 |> WHERE _ > 0 |> EACH both work; they differ only in whether LIMIT counts pre- or post-filter items.

Fold terminals (produce a scalar or optional value)

Operator~T[]~T[N]~T[INF]
EACHyesyesvia LIMIT
SUMyesyesvia LIMIT
COUNTyesyesvia LIMIT
AVERAGEyesyesvia LIMIT
MINyesyesvia LIMIT
MAXyesyesvia LIMIT
ANYyesyesvia LIMIT
ALLyesyesvia LIMIT
FINDyesyesvia LIMIT
REDUCEyesyesvia LIMIT

"via LIMIT" means LIMIT must appear earlier in the same pipeline chain. LIMIT converts ~T[INF] to T[] (a regular list); the fold terminal then operates on that list. Example: counter |> WHERE _ > 0 |> LIMIT 5 |> SUM _.

Materialization terminals (produce a new collection)

These operators consume the stream and produce a new heap-allocated collection. DISTINCT and INDEX are fully supported for all stream types; others require materializing first with .toList().

Operator~T[]~T[N]~T[INF]
DISTINCTyes - returns T[]@setyes - returns T[]@setvia LIMIT - returns T[]@set
INDEXyesyesvia LIMIT
ORDER_BYnot yetnot yetnot yet
UNNESTnot yetnot yetnot yet
WINDOW(N) (sliding)not yetnot yetnot yet
WINDOW(size:, time:) (batch)yesyesyes
JOINnot yetnot yetnot yet

"via LIMIT" means LIMIT must appear earlier in the chain (same rule as fold terminals). DISTINCT returns T[]@set (a Set), supporting .count() and .contains?().

If you need ORDER_BY, UNNEST, sliding WINDOW(N), or JOIN on a stream, materialize first. The batching WINDOW(size:, time:) operator works directly on streams without materialization.

s: ~Int64[] = 0 ..< 10;
vals = s.toList();
total = vals 
  |> WHERE _ > 3 
  |> SUM _;

Concurrent operators

Operator~T[]~T[N]~T[INF]
CONCURRENT EACHnot yetyesnot yet
CONCURRENT SELECTnot yetyesnot yet
CONCURRENT WHEREnot yetyesnot yet

~T[N] concurrent pipelines are native: they consume promise slots directly without materializing through .toList(), and lower through MIR-visible builtin helpers. Example:

nums: ~Float64[4] = [BG { 1.0; }, BG { 2.0; }, BG { 3.0; }, BG { 4.0; }];

doubled = nums 
  |> CONCURRENT(workers: 2) SELECT _ * 2.0;

Direct range expressions still use the non-concurrent path unless first bound as ~T[N].

Open streams

Open streams (~?T[]) are still NEXT-driven only:

gen: ~?Int64[] = BG STREAM {
    YIELD 1;
    YIELD 2;
};

v1 = NEXT gen;
v2 = NEXT gen;
v3 = NEXT gen;     # NIL

SKIP and LIMIT (Pagination)

SKIP and LIMIT are complementary: SKIP drops the first N elements, LIMIT takes the first N.

# Pagination: page 3, 10 items per page
page = items 
  |> SKIP 20
  |> LIMIT 10;

# Skip header row, process the rest
data = rows 
  |> SKIP 1 
  |> SELECT parseRow;

Chaining

Operators compose naturally:

# Filter, sort, take top 3
leaderboard = scores
    |> WHERE _.points > 100
    |> ORDER_BY _.points
    |> LIMIT 3;

# Count active users with high scores
n = users
    |> WHERE _.active == TRUE
    |> COUNT _.score > 1000;

Collection Compatibility

Every operator works on every collection type:

# Array
nums: Float64[] = [1, 2, 3];
total = nums 
  |> SUM _;

# List
MUTABLE data = List[];
avg = data 
  |> AVERAGE _.value;

# Pool
MUTABLE pool: Entity[1000]@pool = [];
alive = pool 
  |> WHERE _.health > 0;

# Pool with SOA (field-slice iteration — cache-optimal)
MUTABLE soa_pool: Entity[1000]@pool:soa = [];
total_hp = soa_pool 
  |> SUM _.health;  # iterates only the health array

# List with SOA
MUTABLE soa_list: Entity[]@list:soa = [];
avg = soa_list 
  |> AVERAGE _.health;   # contiguous f64 slice

# Sharded (parallel EACH via DO blocks)
MUTABLE sharded: Entity[10000]@pool:sharded(4) = [];
sharded 
  |> EACH { _.processed = TRUE; };

Loop Fusion

The compiler automatically fuses chains of WHERE and SELECT stages ending in a fold (SUM, REDUCE, AVERAGE, MIN, MAX, COUNT, ANY, ALL, FIND) into a single loop with zero intermediate allocations.

# Written as 3 stages:
result = data 
  |> WHERE _ > 500.0 
  |> SELECT _ * _ 
  |> SUM _;

# Compiled as a single loop (no intermediate arrays):
# for (data) |it| { if (it > 500) { sum += it * it; } }

This eliminates the allocation and iteration overhead of intermediate lists. Stages that require materialization (ORDER_BY, DISTINCT, INDEX) break the fusion chain - operations before them are fused separately.

SOA Optimization

When a @pool:soa is used in a pipeline, the compiler rewrites field accesses to iterate directly over contiguous field arrays instead of striding over whole structs. This happens automatically for all operators — no syntax change needed.

STRUCT Entity { x: Float64, y: Float64, vx: Float64, vy: Float64, health: Float64 }
MUTABLE pool: Entity[10000]@pool:soa = [];

# SUM _.health iterates only the health array (contiguous f64[]).
# Without :soa, it would load all 5 fields per element.
total = pool 
  |> SUM _.health;

For WHERE and FIND, the predicate uses field-slice access (fast), and the struct is reassembled only for matching elements.

The compiler warns when SOA would help:

NOTE: Pipeline accesses 1 of 5 fields (health). Consider @soa
      for better cache performance on 'Entity'.

Concurrency

The CONCURRENT modifier currently parallelizes collection pipelines for SELECT, WHERE, and EACH:

MUTABLE data: Score[10000]@pool:sharded(4) = [];

# Parallel WHERE: one fiber per shard
results = data 
  |> CONCURRENT WHERE dbFetch(_.id).val > threshold;

# Process in parallel
results = data 
  |> CONCURRENT(parallel: TRUE) SELECT dbFetch(_.id).name;

Options:

CONCURRENT compatibility

Source kindCONCURRENT SELECTCONCURRENT WHERECONCURRENT EACH
Arrays / @list / @pool / @pool:soaYesYesYes
@sharded(...) collectionsYesYesYes
Finite streams ~T[]Not yetNot yetNot yet
Bounded streams ~T[N]YesYesYes
Open streams ~?T[]Not yetNot yetNot yet
Infinite streams ~T[INF]Not yetNot yetNot yet

So today:

Source: docs/pipelines.md