Skip to content

Pipeline Control Operations

In this section, we’ll explore how to use control operations in Pipeline. We’ll use items model for this example.

var schema = {
id: {
type: NIMBUSDB_DATA_TYPE.INTEGER,
const: NIMBUSDB_CONSTRAINT.PRIMARY_KEY
},
name: NIMBUSDB_DATA_TYPE.STRING,
price: {
type: NIMBUSDB_DATA_TYPE.NUMBER,
validator: function(data, value) {
return value >= 0;
},
default_value: 0
},
is_locked: {
type: NIMBUSDB_DATA_TYPE.BOOLEAN,
const: NIMBUSDB_CONSTRAINT.OPTIONAL,
default_value: false
}
};
items = new NimbusDBModel("global", "items", schema, [
{ id: 1, name: "Apple", price: 5 },
{ id: 2, name: "Banana", price: 7.2 },
{ id: 3, name: "Cherry", price: 15 },
{ id: 4, name: "Date", price: 12.5 },
{ id: 5, name: "Elderberry", price: 8 },
{ id: 6, name: "Fig", price: 10 },
{ id: 7, name: "Grape", price: 6 },
{ id: 8, name: "Honeydew", price: 9 },
{ id: 9, name: "Kiwi", price: 4 },
{ id: 10, name: "Lemon", price: 3 }
]);

The tap operation prints the current cached pipeline result or taps into it with a callback.

// (1) print the current pipeline result
var items_pl = items.pipe() // create an `items` pipeline
// let's say the `filter` operation is the first operation in the pipeline
.filter(function(data, index) {
return (data.price % 5 == 0); // include only items with a price that is divisible by 5
})
.tap();
// in the console/output:
// +----+--------+-------+-----------+
// | id | name | price | is_locked |
// +----+--------+-------+-----------+
// | 1 | Apple | 5 | false |
// | 3 | Cherry | 15 | false |
// | 6 | Fig | 10 | false |
// +----+--------+-------+-----------+
// (2) print the current pipeline with configuration
var items_pl = items.pipe() // create an `items` pipeline
// let's say the `filter` operation is the first operation in the pipeline
.filter(function(data, index) {
return (data.price % 5 == 0); // include only items with a price that is divisible by 5
})
.tap({
pick: ["name", "price"], // only print the `name` and `price` columns
// ... other options
});
// in the console/output:
// +--------+-------+
// | name | price |
// +--------+-------+
// | Apple | 5 |
// | Cherry | 15 |
// | Fig | 10 |
// +--------+-------+
// (3) tap into the current pipeline result with a callback
var items_pl = items.pipe().isolate() // create an `items` pipeline and isolate the data
// let's say the `filter` operation is the first operation in the pipeline
.filter(function(data, index) {
return (data.price % 5 == 0); // include only items with a price that is divisible by 5
})
.tap(function(data, index) {
show_debug_message($"Row {index + 1}: {data}");
});
// in the console/output:
// Row 1: { id: 1, name: "Apple", price: 5, is_locked: false }
// Row 2: { id: 3, name: "Cherry", price: 15, is_locked: false }
// Row 3: { id: 6, name: "Fig", price: 10, is_locked: false }

The recover operation is used to register an error recovery handler for an operation in the pipeline. The handler is called when the previous operation throws an error.

var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this in your pipeline:
// { id: 1, name: "Apple", price: 5 }
.find(function(data, index) { // `find` only works with arrays, so it
return (data.price % 5 == 0);
})
.recover(function(err, data) { // register an error recovery handler
show_debug_message($"Error: {err.message}"); // print the error message
return {
data: [], // return a replacement data
stop: false // continue the pipeline. if `stop` = true, the pipeline will stop here
}
});
// from here, the pipeline will continue to execute the next operation with the replacement data (empty array)

The fork operation is used to create a new pipeline branching from the current state. The starting data is the pipeline result from start to the current step.

var items_pl = items.pipe() // create an `items` pipeline
// let's say the `filter` operation is the first operation in the pipeline
.filter(function(data, index) {
return (data.price % 5 == 0);
})
.map(function(data, index) {
return { id: data.id, name: data.name };
});
var filtered_pl = items_pl.fork(); // create a new pipeline branching from the current state
// the `items_pl` has `.filter()` and `.map()` applied to it, but `filtered_pl` is empty
// the `items_pl` pipeline has starting data from the `items` model,
// but the `filtered_pl` pipeline has starting data from the `items_pl.filter().map()` result
// if you call `.exec()` on both pipelines, you'll get the same result
// if you add more operations to the `items_pl` pipeline, it won't affect `filtered_pl` (also vice versa)
items_pl.rollback_by(2); // delete the `map` and `filter` operations
items_pl.pluck("name"); // and create a new `pluck` operation
// now each pipeline has different result
items_pl.exec(); // [ { name: "Apple" }, ..., { name: "Lemon" } ]
filtered_pl.exec(); // [ { id: 1, name: "Apple" }, { id: 3, name: "Cherry" }, { id: 6, name: "Fig" } ]
items_pl:
(the starting data is from the `items` model)
┌───────────┐ ┌────────┐ ┳
│ .filter() │──>│ .map() │ ┃
└───────────┘ └────────┘ ┻
.fork()
┌──────────────────────────┘
filtered_pl:
(the starting data is from the `items_pl.filter().map()` result)
┳ ┌─────────┐
┃ │ (empty) │
┻ └─────────┘

The clone operation creates a clone of the current pipeline state and operations, optionally sharing or copying the cache.

var items_pl = items.pipe() // create an `items` pipeline
// let's say the `filter` operation is the first operation in the pipeline
.filter(function(data, index) {
return (data.price % 5 == 0);
})
.map(function(data, index) {
return { id: data.id, name: data.name };
});
var items_pl_clone = items_pl.clone(); // create a clone of the current pipeline state
// both pipelines have the same starting data and operations
// if you call `.exec()` on both pipelines, you'll get the same result
// if you add more operations to the `items_pl` pipeline, it won't affect `items_pl_clone` (also vice versa)
items_pl.rollback_by(2); // delete the `map` and `filter` operations
items_pl.pluck("name"); // and create a new `pluck` operation
items_pl.exec(); // [ { name: "Apple" }, ..., { name: "Lemon" } ]
items_pl_clone.exec(); // [ { id: 1, name: "Apple" }, { id: 3, name: "Cherry" }, { id: 6, name: "Fig" } ]
items_pl:
(the starting data is from the `items` model)
┌───────────┐ ┌────────┐ ┳
│ .filter() │──>│ .map() │ ┃
└───────────┘ └────────┘ ┻
.clone()
└────────────────────┐
items_pl_clone: │
(the starting data is from the `items` model) │
┌───────────┐ ┌────────┐ ┳ │
│ .filter() │──>│ .map() │ ┃ <──────────────────┘
└───────────┘ └────────┘ ┻

Isolate Array of Objects Object

Section titled “Isolate ”

The isolate function is used to change the current pipeline data into isolated copy.

var items_pl = items.pipe().isolate(); // create an `items` pipeline and isolate the data
// ... pipeline operations ...

Bind Array of Objects Object

Section titled “Bind ”

The bind function is used to change the current pipeline data into linked-access mode.

var items_pl = items.pipe()
// let's say you have this in your pipeline:
// { id: 2, ...other field-values }
.bind(); // change the data into linked-access mode

The materialize operation forces data-caching at the current step.

var items_pl = items.pipe() // create an `items` pipeline
// let's say the `filter` operation is the first operation in the pipeline
.filter(function(data, index) {
return (data.price % 5 == 0);
})
.materialize() // force cache the result before this step
.map(function(data, index) {
return { id: data.id, name: data.name, price: data.price * 2 };
});

The snapshot operation saves the current pipeline result under a named snapshot for later retrieval.

var items_pl = items.pipe() // create an `items` pipeline
// let's say the `filter` operation is the first operation in the pipeline
.filter(function(data, index) {
return (data.price % 5 == 0);
})
.snapshot("filtered_price_5") // save the result under a named snapshot
// ... more operations ...
// and later
var items_of_price_5 = items_pl.get_snapshot("filtered_price_5");

The checkpoint operation saves the current operations with the cursor position under a named checkpoint for later rollback.

// (1) creating checkpoints
var items_pl = items.pipe() // create an `items` pipeline
// let's say the `filter` operation is the first operation in the pipeline
.filter(function(data, index) {
return (data.price % 5 == 0);
})
.checkpoint("filtered_price_5")
.map(function(data, index) {
return { id: data.id, name: data.name, price: data.price * 2 };
})
.checkpoint("double_price");
// ... more operations ...
// (2) rollback by step(s)
items_pl.rollback_by(1);
// now, the `map` operation is the last operation in the pipeline
// (3) rollback to a checkpoint
items_pl.rollback_to("filtered_price_5");

Changes the current pipeline data into linked-access mode, re-sourcing data from the model at this step.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static bind(): NimbusDBPipeline;
}
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Saves the current pipeline result under a named checkpoint for later retrieval.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static checkpoint(
_name: string
): NimbusDBPipeline;
}
  • Type: string
  • The checkpoint name.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Creates a clone of the current pipeline, optionally sharing or copying the cache.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static clone(
_options?: NimbusDBPipelineCloneOptions
): NimbusDBPipeline;
}
  • Type: NimbusDBPipelineCloneOptions
  • Optional clone options (with_cache, shared_cache).
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Creates a new branching pipeline branching from the current state.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static fork(): NimbusDBPipeline;
}
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Retrieves a previously saved snapshot by name.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static get_snapshot(
_name: string
): any;
}
  • Type: string
  • The snapshot name.
  • Type: any
  • The snapshot data.

Changes the current pipeline data into isolated copy.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static isolate(): NimbusDBPipeline;
}
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Forces data-caching at the current step.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static materialize(): NimbusDBPipeline;
}
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Enables mutable mode, where operations modify the current pipeline in-place instead of creating new ones.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static mutate(): void;
}

Registers an error recovery handler for the next operation. Called when an operation throws.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static recover(
_func: (err: NimbusDBPipelineOpsError, data: any) => {
data: any;
stop: boolean;
} | any | void
): NimbusDBPipeline;
}
  • Type: (err: NimbusDBPipelineOpsError, data: any) => { data: any; stop: boolean; } | any | void
  • Error handler function. Can return replacement data and a stop flag.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Rolls back the pipeline operation(s) by a given number of steps.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static rollback_by(
_step: int
): void;
}
  • Type: int
  • The number of steps to roll back (positive integer only).

Rolls back the pipeline operation(s) to a named checkpoint.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static rollback_to(
_checkpoint_name: string
): void;
}
  • Type: string
  • The name of the checkpoint to restore.

Saves the current pipeline result under a named snapshot for later retrieval.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static snapshot(
_name: string
): NimbusDBPipeline;
}
  • Type: string
  • The snapshot name.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Prints the current cached pipeline result or taps into it with a callback.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static tap(
_options?: NimbusDBPrintOptions
): NimbusDBPipeline;
}
  • Type: NimbusDBPrintOptions
  • Default: undefined
  • An optional object that allows you to customize the behavior of the tap operation.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Optional configurations for .clone() method.

pipeline.d.ts
type NimbusDBPipelineCloneOptions = Partial<{
with_cache: boolean; // clone the cache (default = true)
shared_cache: boolean; // use the same cache before the clone operation (default = true, only works if `with_cache` = true)
}>;

An object that contains information about an error that occurred during a pipeline operation.

pipeline.d.ts
type NimbusDBPipelineOpsError = {
data: any; // current pipeline result
message: string;
pos: int;
type: NIMBUSDB_PIPELINE_OPS;
}