Pipeline Control Operations
In this section, we’ll explore how to use control operations in Pipeline. We’ll use items model for this example.
var schema = { id: { type: NIMBUSDB_DATA_TYPE.INTEGER, const: NIMBUSDB_CONSTRAINT.PRIMARY_KEY }, name: NIMBUSDB_DATA_TYPE.STRING, price: { type: NIMBUSDB_DATA_TYPE.NUMBER, validator: function(data, value) { return value >= 0; }, default_value: 0 }, is_locked: { type: NIMBUSDB_DATA_TYPE.BOOLEAN, const: NIMBUSDB_CONSTRAINT.OPTIONAL, default_value: false }};
items = new NimbusDBModel("global", "items", schema, [ { id: 1, name: "Apple", price: 5 }, { id: 2, name: "Banana", price: 7.2 }, { id: 3, name: "Cherry", price: 15 }, { id: 4, name: "Date", price: 12.5 }, { id: 5, name: "Elderberry", price: 8 }, { id: 6, name: "Fig", price: 10 }, { id: 7, name: "Grape", price: 6 }, { id: 8, name: "Honeydew", price: 9 }, { id: 9, name: "Kiwi", price: 4 }, { id: 10, name: "Lemon", price: 3 }]);Debugging Data
Section titled “Debugging Data”Tap Any
Section titled “Tap ”The tap operation prints the current cached pipeline result or taps into it with a callback.
// (1) print the current pipeline resultvar items_pl = items.pipe() // create an `items` pipeline // let's say the `filter` operation is the first operation in the pipeline .filter(function(data, index) { return (data.price % 5 == 0); // include only items with a price that is divisible by 5 }) .tap();
// in the console/output:// +----+--------+-------+-----------+// | id | name | price | is_locked |// +----+--------+-------+-----------+// | 1 | Apple | 5 | false |// | 3 | Cherry | 15 | false |// | 6 | Fig | 10 | false |// +----+--------+-------+-----------+
// (2) print the current pipeline with configurationvar items_pl = items.pipe() // create an `items` pipeline // let's say the `filter` operation is the first operation in the pipeline .filter(function(data, index) { return (data.price % 5 == 0); // include only items with a price that is divisible by 5 }) .tap({ pick: ["name", "price"], // only print the `name` and `price` columns // ... other options });
// in the console/output:// +--------+-------+// | name | price |// +--------+-------+// | Apple | 5 |// | Cherry | 15 |// | Fig | 10 |// +--------+-------+
// (3) tap into the current pipeline result with a callbackvar items_pl = items.pipe().isolate() // create an `items` pipeline and isolate the data // let's say the `filter` operation is the first operation in the pipeline .filter(function(data, index) { return (data.price % 5 == 0); // include only items with a price that is divisible by 5 }) .tap(function(data, index) { show_debug_message($"Row {index + 1}: {data}"); });
// in the console/output:// Row 1: { id: 1, name: "Apple", price: 5, is_locked: false }// Row 2: { id: 3, name: "Cherry", price: 15, is_locked: false }// Row 3: { id: 6, name: "Fig", price: 10, is_locked: false }Error Handling
Section titled “Error Handling”Recover Any
Section titled “Recover ”The recover operation is used to register an error recovery handler for an operation in the pipeline. The handler is called when the previous operation throws an error.
var items_pl = items.pipe() // create an `items` pipeline // let's say you have this in your pipeline: // { id: 1, name: "Apple", price: 5 } .find(function(data, index) { // `find` only works with arrays, so it return (data.price % 5 == 0); }) .recover(function(err, data) { // register an error recovery handler show_debug_message($"Error: {err.message}"); // print the error message return { data: [], // return a replacement data stop: false // continue the pipeline. if `stop` = true, the pipeline will stop here } }); // from here, the pipeline will continue to execute the next operation with the replacement data (empty array)Branching Pipelines
Section titled “Branching Pipelines”Fork Any
Section titled “Fork ”The fork operation is used to create a new pipeline branching from the current state. The starting data is the pipeline result from start to the current step.
var items_pl = items.pipe() // create an `items` pipeline // let's say the `filter` operation is the first operation in the pipeline .filter(function(data, index) { return (data.price % 5 == 0); }) .map(function(data, index) { return { id: data.id, name: data.name }; });
var filtered_pl = items_pl.fork(); // create a new pipeline branching from the current state
// the `items_pl` has `.filter()` and `.map()` applied to it, but `filtered_pl` is empty// the `items_pl` pipeline has starting data from the `items` model,// but the `filtered_pl` pipeline has starting data from the `items_pl.filter().map()` result
// if you call `.exec()` on both pipelines, you'll get the same result// if you add more operations to the `items_pl` pipeline, it won't affect `filtered_pl` (also vice versa)items_pl.rollback_by(2); // delete the `map` and `filter` operationsitems_pl.pluck("name"); // and create a new `pluck` operation
// now each pipeline has different resultitems_pl.exec(); // [ { name: "Apple" }, ..., { name: "Lemon" } ]filtered_pl.exec(); // [ { id: 1, name: "Apple" }, { id: 3, name: "Cherry" }, { id: 6, name: "Fig" } ]items_pl:(the starting data is from the `items` model)┌───────────┐ ┌────────┐ ┳│ .filter() │──>│ .map() │ ┃└───────────┘ └────────┘ ┻ │ .fork()┌──────────────────────────┘↓filtered_pl:(the starting data is from the `items_pl.filter().map()` result)┳ ┌─────────┐┃ │ (empty) │┻ └─────────┘Clone Any
Section titled “Clone ”The clone operation creates a clone of the current pipeline state and operations, optionally sharing or copying the cache.
var items_pl = items.pipe() // create an `items` pipeline // let's say the `filter` operation is the first operation in the pipeline .filter(function(data, index) { return (data.price % 5 == 0); }) .map(function(data, index) { return { id: data.id, name: data.name }; });
var items_pl_clone = items_pl.clone(); // create a clone of the current pipeline state
// both pipelines have the same starting data and operations// if you call `.exec()` on both pipelines, you'll get the same result// if you add more operations to the `items_pl` pipeline, it won't affect `items_pl_clone` (also vice versa)items_pl.rollback_by(2); // delete the `map` and `filter` operationsitems_pl.pluck("name"); // and create a new `pluck` operation
items_pl.exec(); // [ { name: "Apple" }, ..., { name: "Lemon" } ]items_pl_clone.exec(); // [ { id: 1, name: "Apple" }, { id: 3, name: "Cherry" }, { id: 6, name: "Fig" } ]items_pl:(the starting data is from the `items` model)┌───────────┐ ┌────────┐ ┳│ .filter() │──>│ .map() │ ┃└───────────┘ └────────┘ ┻ │ .clone() └────────────────────┐items_pl_clone: │(the starting data is from the `items` model) │┌───────────┐ ┌────────┐ ┳ ││ .filter() │──>│ .map() │ ┃ <──────────────────┘└───────────┘ └────────┘ ┻Mutating Data
Section titled “Mutating Data”Isolate Array of Objects Object
Section titled “Isolate ”The isolate function is used to change the current pipeline data into isolated copy.
var items_pl = items.pipe().isolate(); // create an `items` pipeline and isolate the data // ... pipeline operations ...Bind Array of Objects Object
Section titled “Bind ”The bind function is used to change the current pipeline data into linked-access mode.
var items_pl = items.pipe() // let's say you have this in your pipeline: // { id: 2, ...other field-values } .bind(); // change the data into linked-access modeState Management
Section titled “State Management”Materialize Any
Section titled “Materialize ”The materialize operation forces data-caching at the current step.
var items_pl = items.pipe() // create an `items` pipeline // let's say the `filter` operation is the first operation in the pipeline .filter(function(data, index) { return (data.price % 5 == 0); }) .materialize() // force cache the result before this step .map(function(data, index) { return { id: data.id, name: data.name, price: data.price * 2 }; });Snapshot Any
Section titled “Snapshot ”The snapshot operation saves the current pipeline result under a named snapshot for later retrieval.
var items_pl = items.pipe() // create an `items` pipeline // let's say the `filter` operation is the first operation in the pipeline .filter(function(data, index) { return (data.price % 5 == 0); }) .snapshot("filtered_price_5") // save the result under a named snapshot // ... more operations ...
// and latervar items_of_price_5 = items_pl.get_snapshot("filtered_price_5");Checkpoint & Rollback Any
Section titled “Checkpoint & Rollback ”The checkpoint operation saves the current operations with the cursor position under a named checkpoint for later rollback.
// (1) creating checkpointsvar items_pl = items.pipe() // create an `items` pipeline // let's say the `filter` operation is the first operation in the pipeline .filter(function(data, index) { return (data.price % 5 == 0); }) .checkpoint("filtered_price_5") .map(function(data, index) { return { id: data.id, name: data.name, price: data.price * 2 }; }) .checkpoint("double_price"); // ... more operations ...
// (2) rollback by step(s)items_pl.rollback_by(1);// now, the `map` operation is the last operation in the pipeline
// (3) rollback to a checkpointitems_pl.rollback_to("filtered_price_5");References
Section titled “References”Pipeline.bind()
Section titled “Pipeline.bind()”Changes the current pipeline data into linked-access mode, re-sourcing data from the model at this step.
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static bind(): NimbusDBPipeline;}Returns
Section titled “Returns”- Type:
NimbusDBPipeline - A new
NimbusDBPipelineinstance (mutable = false) or the current pipeline instance (mutable = true).
Pipeline.checkpoint()
Section titled “Pipeline.checkpoint()”Saves the current pipeline result under a named checkpoint for later retrieval.
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static checkpoint( _name: string ): NimbusDBPipeline;}Parameters
Section titled “Parameters”- Type:
string - The checkpoint name.
Returns
Section titled “Returns”- Type:
NimbusDBPipeline - A new
NimbusDBPipelineinstance (mutable = false) or the current pipeline instance (mutable = true).
Pipeline.clone()
Section titled “Pipeline.clone()”Creates a clone of the current pipeline, optionally sharing or copying the cache.
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static clone( _options?: NimbusDBPipelineCloneOptions ): NimbusDBPipeline;}Parameters
Section titled “Parameters”_options
Section titled “_options”- Type:
NimbusDBPipelineCloneOptions - Optional clone options (
with_cache,shared_cache).
Returns
Section titled “Returns”- Type:
NimbusDBPipeline - A new
NimbusDBPipelineinstance (mutable = false) or the current pipeline instance (mutable = true).
Pipeline.fork()
Section titled “Pipeline.fork()”Creates a new branching pipeline branching from the current state.
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static fork(): NimbusDBPipeline;}Returns
Section titled “Returns”- Type:
NimbusDBPipeline - A new
NimbusDBPipelineinstance (mutable = false) or the current pipeline instance (mutable = true).
Pipeline.get_snapshot()
Section titled “Pipeline.get_snapshot()”Retrieves a previously saved snapshot by name.
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static get_snapshot( _name: string ): any;}Parameters
Section titled “Parameters”- Type:
string - The snapshot name.
Returns
Section titled “Returns”- Type:
any - The snapshot data.
Pipeline.isolate()
Section titled “Pipeline.isolate()”Changes the current pipeline data into isolated copy.
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static isolate(): NimbusDBPipeline;}Returns
Section titled “Returns”- Type:
NimbusDBPipeline - A new
NimbusDBPipelineinstance (mutable = false) or the current pipeline instance (mutable = true).
Pipeline.materialize()
Section titled “Pipeline.materialize()”Forces data-caching at the current step.
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static materialize(): NimbusDBPipeline;}Returns
Section titled “Returns”- Type:
NimbusDBPipeline - A new
NimbusDBPipelineinstance (mutable = false) or the current pipeline instance (mutable = true).
Pipeline.mutate()
Section titled “Pipeline.mutate()”Enables mutable mode, where operations modify the current pipeline in-place instead of creating new ones.
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static mutate(): void;}Pipeline.recover()
Section titled “Pipeline.recover()”Registers an error recovery handler for the next operation. Called when an operation throws.
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static recover( _func: (err: NimbusDBPipelineOpsError, data: any) => { data: any; stop: boolean; } | any | void ): NimbusDBPipeline;}Parameters
Section titled “Parameters”- Type:
(err: NimbusDBPipelineOpsError, data: any) => { data: any; stop: boolean; } | any | void - Error handler function. Can return replacement data and a
stopflag.
Returns
Section titled “Returns”- Type:
NimbusDBPipeline - A new
NimbusDBPipelineinstance (mutable = false) or the current pipeline instance (mutable = true).
Pipeline.rollback_by()
Section titled “Pipeline.rollback_by()”Rolls back the pipeline operation(s) by a given number of steps.
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static rollback_by( _step: int ): void;}Parameters
Section titled “Parameters”- Type:
int - The number of steps to roll back (positive integer only).
Pipeline.rollback_to()
Section titled “Pipeline.rollback_to()”Rolls back the pipeline operation(s) to a named checkpoint.
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static rollback_to( _checkpoint_name: string ): void;}Parameters
Section titled “Parameters”_checkpoint_name
Section titled “_checkpoint_name”- Type:
string - The name of the checkpoint to restore.
Pipeline.snapshot()
Section titled “Pipeline.snapshot()”Saves the current pipeline result under a named snapshot for later retrieval.
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static snapshot( _name: string ): NimbusDBPipeline;}Parameters
Section titled “Parameters”- Type:
string - The snapshot name.
Returns
Section titled “Returns”- Type:
NimbusDBPipeline - A new
NimbusDBPipelineinstance (mutable = false) or the current pipeline instance (mutable = true).
Pipeline.tap()
Section titled “Pipeline.tap()”Prints the current cached pipeline result or taps into it with a callback.
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static tap( _options?: NimbusDBPrintOptions ): NimbusDBPipeline;}Parameters
Section titled “Parameters”_options
Section titled “_options”- Type:
NimbusDBPrintOptions - Default:
undefined - An optional object that allows you to customize the behavior of the tap operation.
Returns
Section titled “Returns”- Type:
NimbusDBPipeline - A new
NimbusDBPipelineinstance (mutable = false) or the current pipeline instance (mutable = true).
Signature
Section titled “Signature”class NimbusDBPipeline { // ... other methods and properties ... static tap( _func: (data: any, index: int) => void ): NimbusDBPipeline;}Parameters
Section titled “Parameters”- Type:
(data: any, index: int) => void- Parameters:
data: The current data object.index: The current data index.
- Parameters:
- The callback function to use for the tap operation.
Returns
Section titled “Returns”- Type:
NimbusDBPipeline - A new
NimbusDBPipelineinstance (mutable = false) or the current pipeline instance (mutable = true).
NimbusDBPipelineCloneOptions
Section titled “NimbusDBPipelineCloneOptions”Optional configurations for .clone() method.
type NimbusDBPipelineCloneOptions = Partial<{ with_cache: boolean; // clone the cache (default = true) shared_cache: boolean; // use the same cache before the clone operation (default = true, only works if `with_cache` = true)}>;NimbusDBPipelineOpsError
Section titled “NimbusDBPipelineOpsError”An object that contains information about an error that occurred during a pipeline operation.
type NimbusDBPipelineOpsError = { data: any; // current pipeline result message: string; pos: int; type: NIMBUSDB_PIPELINE_OPS;}