Pipeline Overview
In this section, we will explore the basics of pipelines in NimbusDB. We’ll use this schema and the items model as an example:
var schema = { id: { type: NIMBUSDB_DATA_TYPE.INTEGER, const: NIMBUSDB_CONSTRAINT.PRIMARY_KEY }, name: NIMBUSDB_DATA_TYPE.STRING, price: { type: NIMBUSDB_DATA_TYPE.NUMBER, validator: function(data, value) { return value >= 0; }, default_value: 0 }, is_locked: { type: NIMBUSDB_DATA_TYPE.BOOLEAN, const: NIMBUSDB_CONSTRAINT.OPTIONAL, default_value: false }};
items = new NimbusDBModel("global", "items", schema, [ { id: 1, name: "Apple", price: 5 }, { id: 2, name: "Banana", price: 7.2 }, { id: 3, name: "Cherry", price: 15 }, { id: 4, name: "Date", price: 12.5 }, { id: 5, name: "Elderberry", price: 8 }, { id: 6, name: "Fig", price: 10 }]);What is a Pipeline?
Section titled “What is a Pipeline?”A pipeline is a series of operations that transform and process data in a NimbusDB model. It’s a powerful tool that allows you to perform complex data transformations and calculations with ease.
Lazy Evaluation
Section titled “Lazy Evaluation”Pipelines are lazy-evaluated, which means they won’t execute their operations until you call the .exec() or .get() method on the pipeline instance.
Cursor System
Section titled “Cursor System”Each pipeline instance has a cursor, which is a pointer to the current operation position in the pipeline. See the Cursor System section for more details.
Cache Strategy
Section titled “Cache Strategy”Pipelines have a built-in cache strategy that allows you to cache the results of the pipeline operations. This can be useful for improving performance and reducing the amount of data that needs to be processed.
No cache is used, and the pipeline always executes the operations from the beginning.
Cache each operation result, and use the cached result if available. This stategy is not recommended for large datasets, as it can consume a lot of memory.
PARTIAL (Default)
Section titled “PARTIAL (Default)”Cache only the expensive operations, and use the cached result if available. This stategy is great for most use cases.
SMART (Experimental)
Section titled “SMART (Experimental)”This stategy is still in development, and it’s not recommended for use in production.
Creating a Pipeline
Section titled “Creating a Pipeline”To create a pipeline, you can use the .pipe() method on a model instance. It returns a new pipeline instance that you can use to chain operations and define the pipeline’s logic.
var items_pl = items.pipe(); // use default options
var items_pl = items.pipe({ // with options cache_strategy: NIMBUSDB_PIPELINE_CACHE.FULL, mutable: true});Transforming Data
Section titled “Transforming Data”Once you have a pipeline instance, you can use it to transform data in the model. You can chain operations on the pipeline instance to perform various data transformations, such as filtering, mapping, reducing, sorting, and more.
// in this example, we'll use the `items_pl` pipeline instance// let's say, we want to filter the `items` model's data to only include items with a price multiply of 5// and then multiply each item's price by 2
var items_pl = items.pipe() // first, pipe the `items` model .filter(function(data, index) { // then, filter the data using a predicate function return data.price % 5 == false; // the predicate function should return `true` if the data should be included }) .map(function(data, index) { // then, map the data using a mapping function return { // the mapping function should return a new data object name: $"Packed {data.name}", price: data.price * 2 }; });The pipeline instance is now ready to execute the operations and return the final result.
Executing a Pipeline
Section titled “Executing a Pipeline”Once you have a pipeline instance and have defined the operations, you can execute it using the .exec() or .get() method. This will execute all (or part of) the operations in the pipeline and return the final result.
// let's use the previous pipeline for this examplevar result = items_pl.exec(); // or `items_pl.get()`, run from start to endvar result = items_pl.exec(2); // run from step 1 to step 2var result = items_pl.exec(-1, { // with options keep_cursor: true, // set cursor position to 5 instead of the end // ...other options});
// this is the result of the pipeline execution:// [// { name: "Packed Apple", price: 10 },// { name: "Packed Cherry", price: 30 },// { name: "Packed Fig", price: 20 }// ]
// and later if you need to get the result again, you can use `.exec()` / `.get()` againvar result_again = items_pl.exec();That’s it! You have now created and executed a pipeline to transform and process data in the items model.
Cursor System
Section titled “Cursor System”You can use the cursor to control the execution of the pipeline, re-run the pipeline from the current position, skip operations, or even overwrite the old operations with new ones.
Let’s say you have this pipeline:
var items_pl = items.pipe() .filter(function(data, index) { return data.price % 5 == false; }) .limit(2) .map(function(data, index) { return { name: $"Packed {data.name}", price: data.price * 2 }; });The cursor position will be moved forward (1 step) after each operation, and the result will be cached at that step.
┌───────────┐ ┌──────────┐ ┌────────┐ ┳│ .filter() │──>│ .limit() │──>│ .map() │ ┃└───────────┘ └──────────┘ └────────┘ ┻ │ your cursor position <─┘Moving the Cursor
Section titled “Moving the Cursor”You can use the .move_to(), .move_to_first(), and .move_to_last() methods to move the cursor to a specific position in the pipeline.
.move_to(step, relative?)
Section titled “.move_to(step, relative?)”// let's use the previous pipeline// absolute moveitems_pl.move_to(2); // move to step 2 (.limit operation)┌───────────┐ ┳ ┌──────────┐ ┌────────┐│ .filter() │──╋─>│ .limit() │──>│ .map() │└───────────┘ ┻ └──────────┘ └────────┘ │ └─> your cursor position// relative moveitems_pl.move_to(1, true); // move 1 step from current position┌───────────┐ ┌──────────┐ ┌────────┐ ┳│ .filter() │──>│ .limit() │──>│ .map() │ ┃└───────────┘ └──────────┘ └────────┘ ┻ │ your cursor position <─┘.move_to_first()
Section titled “.move_to_first()”items_pl.move_to_first(); // move to first step┳ ┌───────────┐ ┌──────────┐ ┌────────┐┃ │ .filter() │──>│ .limit() │──>│ .map() │┻ └───────────┘ └──────────┘ └────────┘│└─> your cursor position.move_to_last()
Section titled “.move_to_last()”items_pl.move_to_last(); // move to last step┌───────────┐ ┌──────────┐ ┌────────┐ ┳│ .filter() │──>│ .limit() │──>│ .map() │ ┃└───────────┘ └──────────┘ └────────┘ ┻ │ your cursor position <─┘Overriding the Operations
Section titled “Overriding the Operations”If you move the cursor to a specific step and define new operations, the old operations will be overwritten.
var items_pl = items.pipe() .filter(function(data, index) { return data.price % 5 == false; }) .limit(2) .map(function(data, index) { return { name: $"Packed {data.name}", price: data.price * 2 }; });
// move to step 3 (.map operation)items_pl.move_to(3);
// now, if we define a new operation, it will overwrite the next operation (.map)items_pl.pluck("name");The result will be:
┌───────────┐ ┌──────────┐ ┌───────────┐ ┳│ .filter() │──>│ .limit() │──>│ .pluck() │ ┃└───────────┘ └──────────┘ └───────────┘ ┻ │ your cursor position <─┘References
Section titled “References”NimbusDBPipeline Internal
Section titled “NimbusDBPipeline ”A lazy, chainable data transformation pipeline for NimbusDB models. Supports a wide range of operations (filter, map, reduce, sort, etc.) with optional caching, snapshotting, and cursor-based step control.
class NimbusDBPipeline { constructor( _init_data?: any, _saves?: NimbusDBPipelineSave[], _options?: NimbusDBPipelineOptions );
cursor: int; id: int;
static __id: int; __cache: { [step_idx: int]: any; }; __cache_strategy: NIMBUSDB_PIPELINE_CACHE; __checkpoints: NimbusDBPipelineCheckpoint[]; __is_mutable: boolean; __model: NimbusDBModel | null; __on_error: "stop" | "continue" | (( err: NimbusDBPipelineOpsError ) => boolean); __result: int; __saves: NimbusDBPipelineSave[]; __snapshot: { [name: int]: any; }; __sys_temp?: Struct;
// ...public and internal methods}Model.pipe()
Section titled “Model.pipe()”Creates a pipeline from the model.
Signature
Section titled “Signature”class NimbusDBModel { // ... other methods and properties ... static pipe( _options?: NimbusDBPipelineOptions ): NimbusDBPipeline;}Parameters
Section titled “Parameters”_options
Section titled “_options”- Type:
NimbusDBPipelineOptions - Default:
undefined - An optional object that allows you to customize the behavior of the pipe operation.
Returns
Section titled “Returns”- Type:
NimbusDBPipeline - The pipeline instance.
NimbusDBPipelineCheckpoint Internal
Section titled “NimbusDBPipelineCheckpoint ”A save point for the pipeline. Used to restore the pipeline to a specific state.
type NimbusDBPipelineCheckpoint = { name: string; pos: int;}NimbusDBPipelineOptions
Section titled “NimbusDBPipelineOptions”An optional object that allows you to customize the behavior of the pipeline instance.
type NimbusDBPipelineOptions = Partial<{ cache: { // [INTERNAL] cache the result of the pipeline operations [step_idx: int]: any; }; cache_strategy: NIMBUSDB_PIPELINE_CACHE; // cache strategy for the pipeline (default = PARTIAL) checkpoints: NimbusDBPipelineCheckpoint[]; // [INTERNAL] save points for the pipeline (default = []) cursor: int; // cursor position for the pipeline (default = -1) on_error: "stop" | "continue" | (( // error handler for the pipeline (default = "stop") err: NimbusDBPipelineOpsError ) => boolean | null); model: NimbusDBModel; // [INTERNAL] model instance mutable: boolean; // return current pipeline instead of new pipeline (default = false) result: int; // [INTERNAL] latest result of the pipeline, stored as cache key (default = -1)}>;NimbusDBPipelineOpsError
Section titled “NimbusDBPipelineOpsError”An object that contains information about an error that occurred during pipeline execution.
type NimbusDBPipelineOpsError = { data: any; // current pipeline result message: string; // error message pos: int; // current cursor position type: NIMBUSDB_PIPELINE_OPS; // type of the current operation}NimbusDBPipelineOpsOptions
Section titled “NimbusDBPipelineOpsOptions”An optional object that allows you to customize the behavior of the pipeline operations.
type NimbusDBPipelineOpsOptions = Partial<{ start_index: int; // start index for the data-searching operation (default = 0) length: int; // length for the data-searching operation (default = data.length) offset: int; // alias for `start_index` limit: int; // alias for `length`}>NimbusDBPipelineSave Internal
Section titled “NimbusDBPipelineSave ”A save point for the pipeline. Used to restore the pipeline to a specific state.
type NimbusDBPipelineSave = ( | { type: NIMBUSDB_PIPELINE_OPS.CHECKPOINT; name: string; } | { type: NIMBUSDB_PIPELINE_OPS.CATCH; fn: (err: NimbusDBPipelineOpsError, data: any) => { data: any; stop: boolean; } | any | void; } | { type: NIMBUSDB_PIPELINE_OPS.CLONE; with_cache: boolean; shared_cache: boolean; } | { type: NIMBUSDB_PIPELINE_OPS.CONSTRUCT; columns: string[] | null; level: int; } | { type: NIMBUSDB_PIPELINE_OPS.DESTRUCT; columns: string[] | null; } | { type: NIMBUSDB_PIPELINE_OPS.DISTINCT; columns: string[]; } | { type: NIMBUSDB_PIPELINE_OPS.DROP_WHILE; fn: (value: any, idx: int) => boolean; from_end: boolean; reverse: boolean; } | { type: NIMBUSDB_PIPELINE_OPS.FILTER; fn: (value: any, idx: int) => boolean; } | { type: NIMBUSDB_PIPELINE_OPS.FILTER; fn: (prop: string, value: any) => boolean; } | { type: NIMBUSDB_PIPELINE_OPS.FIND; fn: (value: any, idx: int) => boolean; } | { type: NIMBUSDB_PIPELINE_OPS.FLAT_MAP; fn: (value: any, idx: int) => any; level: int; } | { type: NIMBUSDB_PIPELINE_OPS.FLATTEN; level: int; } | { type: NIMBUSDB_PIPELINE_OPS.FLATTEN_DEEP; level: int; } | { type: NIMBUSDB_PIPELINE_OPS.FORK; } | { type: NIMBUSDB_PIPELINE_OPS.ISOLATE; } | { type: NIMBUSDB_PIPELINE_OPS.LIMIT; count: int; } | { type: NIMBUSDB_PIPELINE_OPS.MATERIALIZE; } | { type: NIMBUSDB_PIPELINE_OPS.MERGE; data: any[]; } | { type: NIMBUSDB_PIPELINE_OPS.MAP; fn: (value: any, idx: int) => any; } | { type: NIMBUSDB_PIPELINE_OPS.MAP; fn: (prop: string, value: any) => any; columns: string[] | null; } | { type: NIMBUSDB_PIPELINE_OPS.OFFSET; count: int; } | { type: NIMBUSDB_PIPELINE_OPS.PLUCK; columns: string[]; } | { type: NIMBUSDB_PIPELINE_OPS.REDUCE; fn: (acc: any, value: any, idx: int) => any; acc: any; } | { type: NIMBUSDB_PIPELINE_OPS.REDUCE; fn: (acc: any, prop: string, value: any) => any; acc: any; columns: string[] | null; } | { type: NIMBUSDB_PIPELINE_OPS.REJECT; fn: (value: any, idx: int) => any; } | { type: NIMBUSDB_PIPELINE_OPS.RENAME; columns: string[] | Struct; } | { type: NIMBUSDB_PIPELINE_OPS.SAMPLE; count: int; } | { type: NIMBUSDB_PIPELINE_OPS.SLICE; start: int; length: int; } | { type: NIMBUSDB_PIPELINE_OPS.SNAPSHOT; name: string; } | { type: NIMBUSDB_PIPELINE_OPS.SORT; ascending: boolean; } | { type: NIMBUSDB_PIPELINE_OPS.SORT; column: string; ascending: boolean; } | { type: NIMBUSDB_PIPELINE_OPS.SORT; fn: (current: any, next: any) => number; } | { type: NIMBUSDB_PIPELINE_OPS.TAKE_WHILE; fn: (value: any, idx: int) => boolean; } | { type: NIMBUSDB_PIPELINE_OPS.TAP; } | { type: NIMBUSDB_PIPELINE_OPS.TAP; opt: NimbusDBPrintOptions; } | { type: NIMBUSDB_PIPELINE_OPS.TAP; fn: (value: any, idx: int) => void; } | { type: NIMBUSDB_PIPELINE_OPS.TAP; fn: (prop: string, value: any) => void; } | { type: NIMBUSDB_PIPELINE_OPS.TAP; fn: (value: any) => void; }) & NimbusDBPipelineOpsOptions;NimbusDBPipelineTerminalOptions
Section titled “NimbusDBPipelineTerminalOptions”An optional object that allows you to customize the behavior of the pipeline operations.
type NimbusDBPipelineTerminalOptions = Partial<{ keep_cursor: boolean; // move the cursor to the `step` parameter (default = false) ops_name: string; // [INTERNAL] name of the current operation relative: boolean; // calculate the `step` parameter from the current cursor position (default = false) silent: boolean; // [INTERNAL] suppress debug messages (default = false)}> & NimbusDBPipelineOpsOptions;NIMBUSDB_PIPELINE_OPS Internal
Section titled “NIMBUSDB_PIPELINE_OPS ”An enum that represents the different types of pipeline operations.
enum NIMBUSDB_PIPELINE_OPS { BIND, CATCH, CHECKPOINT, CLONE, CONSTRUCT, DESTRUCT, DISTINCT, DROP_WHILE, FILTER, FIND, FLAT_MAP, FLATTEN, FLATTEN_DEEP, FORK, ISOLATE, LIMIT, MAP, MATERIALIZE, MERGE, OFFSET, PLUCK, REDUCE, REJECT, RENAME, SAMPLE, SLICE, SNAPSHOT, SORT, TAKE_WHILE, TAP}NIMBUSDB_PIPELINE_CACHE
Section titled “NIMBUSDB_PIPELINE_CACHE”An enum that represents the different cache strategies for pipeline operations.
enum NIMBUSDB_PIPELINE_CACHE { NONE, FULL, PARTIAL, SMART}