Skip to content

Pipeline Overview

In this section, we will explore the basics of pipelines in NimbusDB. We’ll use this schema and the items model as an example:

var schema = {
id: {
type: NIMBUSDB_DATA_TYPE.INTEGER,
const: NIMBUSDB_CONSTRAINT.PRIMARY_KEY
},
name: NIMBUSDB_DATA_TYPE.STRING,
price: {
type: NIMBUSDB_DATA_TYPE.NUMBER,
validator: function(data, value) {
return value >= 0;
},
default_value: 0
},
is_locked: {
type: NIMBUSDB_DATA_TYPE.BOOLEAN,
const: NIMBUSDB_CONSTRAINT.OPTIONAL,
default_value: false
}
};
items = new NimbusDBModel("global", "items", schema, [
{ id: 1, name: "Apple", price: 5 },
{ id: 2, name: "Banana", price: 7.2 },
{ id: 3, name: "Cherry", price: 15 },
{ id: 4, name: "Date", price: 12.5 },
{ id: 5, name: "Elderberry", price: 8 },
{ id: 6, name: "Fig", price: 10 }
]);

A pipeline is a series of operations that transform and process data in a NimbusDB model. It’s a powerful tool that allows you to perform complex data transformations and calculations with ease.

Pipelines are lazy-evaluated, which means they won’t execute their operations until you call the .exec() or .get() method on the pipeline instance.

Each pipeline instance has a cursor, which is a pointer to the current operation position in the pipeline. See the Cursor System section for more details.

Pipelines have a built-in cache strategy that allows you to cache the results of the pipeline operations. This can be useful for improving performance and reducing the amount of data that needs to be processed.

No cache is used, and the pipeline always executes the operations from the beginning.

Cache each operation result, and use the cached result if available. This stategy is not recommended for large datasets, as it can consume a lot of memory.

Cache only the expensive operations, and use the cached result if available. This stategy is great for most use cases.

This stategy is still in development, and it’s not recommended for use in production.

To create a pipeline, you can use the .pipe() method on a model instance. It returns a new pipeline instance that you can use to chain operations and define the pipeline’s logic.

var items_pl = items.pipe(); // use default options
var items_pl = items.pipe({ // with options
cache_strategy: NIMBUSDB_PIPELINE_CACHE.FULL,
mutable: true
});

Once you have a pipeline instance, you can use it to transform data in the model. You can chain operations on the pipeline instance to perform various data transformations, such as filtering, mapping, reducing, sorting, and more.

// in this example, we'll use the `items_pl` pipeline instance
// let's say, we want to filter the `items` model's data to only include items with a price multiply of 5
// and then multiply each item's price by 2
var items_pl = items.pipe() // first, pipe the `items` model
.filter(function(data, index) { // then, filter the data using a predicate function
return data.price % 5 == false; // the predicate function should return `true` if the data should be included
})
.map(function(data, index) { // then, map the data using a mapping function
return { // the mapping function should return a new data object
name: $"Packed {data.name}",
price: data.price * 2
};
});

The pipeline instance is now ready to execute the operations and return the final result.

Once you have a pipeline instance and have defined the operations, you can execute it using the .exec() or .get() method. This will execute all (or part of) the operations in the pipeline and return the final result.

// let's use the previous pipeline for this example
var result = items_pl.exec(); // or `items_pl.get()`, run from start to end
var result = items_pl.exec(2); // run from step 1 to step 2
var result = items_pl.exec(-1, { // with options
keep_cursor: true, // set cursor position to 5 instead of the end
// ...other options
});
// this is the result of the pipeline execution:
// [
// { name: "Packed Apple", price: 10 },
// { name: "Packed Cherry", price: 30 },
// { name: "Packed Fig", price: 20 }
// ]
// and later if you need to get the result again, you can use `.exec()` / `.get()` again
var result_again = items_pl.exec();

That’s it! You have now created and executed a pipeline to transform and process data in the items model.

You can use the cursor to control the execution of the pipeline, re-run the pipeline from the current position, skip operations, or even overwrite the old operations with new ones.

Let’s say you have this pipeline:

var items_pl = items.pipe()
.filter(function(data, index) {
return data.price % 5 == false;
})
.limit(2)
.map(function(data, index) {
return {
name: $"Packed {data.name}",
price: data.price * 2
};
});

The cursor position will be moved forward (1 step) after each operation, and the result will be cached at that step.

┌───────────┐ ┌──────────┐ ┌────────┐ ┳
│ .filter() │──>│ .limit() │──>│ .map() │ ┃
└───────────┘ └──────────┘ └────────┘ ┻
your cursor position <─┘

You can use the .move_to(), .move_to_first(), and .move_to_last() methods to move the cursor to a specific position in the pipeline.

// let's use the previous pipeline
// absolute move
items_pl.move_to(2); // move to step 2 (.limit operation)
┌───────────┐ ┳ ┌──────────┐ ┌────────┐
│ .filter() │──╋─>│ .limit() │──>│ .map() │
└───────────┘ ┻ └──────────┘ └────────┘
└─> your cursor position
// relative move
items_pl.move_to(1, true); // move 1 step from current position
┌───────────┐ ┌──────────┐ ┌────────┐ ┳
│ .filter() │──>│ .limit() │──>│ .map() │ ┃
└───────────┘ └──────────┘ └────────┘ ┻
your cursor position <─┘
items_pl.move_to_first(); // move to first step
┳ ┌───────────┐ ┌──────────┐ ┌────────┐
┃ │ .filter() │──>│ .limit() │──>│ .map() │
┻ └───────────┘ └──────────┘ └────────┘
└─> your cursor position
items_pl.move_to_last(); // move to last step
┌───────────┐ ┌──────────┐ ┌────────┐ ┳
│ .filter() │──>│ .limit() │──>│ .map() │ ┃
└───────────┘ └──────────┘ └────────┘ ┻
your cursor position <─┘

If you move the cursor to a specific step and define new operations, the old operations will be overwritten.

var items_pl = items.pipe()
.filter(function(data, index) {
return data.price % 5 == false;
})
.limit(2)
.map(function(data, index) {
return {
name: $"Packed {data.name}",
price: data.price * 2
};
});
// move to step 3 (.map operation)
items_pl.move_to(3);
// now, if we define a new operation, it will overwrite the next operation (.map)
items_pl.pluck("name");

The result will be:

┌───────────┐ ┌──────────┐ ┌───────────┐ ┳
│ .filter() │──>│ .limit() │──>│ .pluck() │ ┃
└───────────┘ └──────────┘ └───────────┘ ┻
your cursor position <─┘

A lazy, chainable data transformation pipeline for NimbusDB models. Supports a wide range of operations (filter, map, reduce, sort, etc.) with optional caching, snapshotting, and cursor-based step control.

pipeline.d.ts
class NimbusDBPipeline {
constructor(
_init_data?: any,
_saves?: NimbusDBPipelineSave[],
_options?: NimbusDBPipelineOptions
);
cursor: int;
id: int;
static __id: int;
__cache: {
[step_idx: int]: any;
};
__cache_strategy: NIMBUSDB_PIPELINE_CACHE;
__checkpoints: NimbusDBPipelineCheckpoint[];
__is_mutable: boolean;
__model: NimbusDBModel | null;
__on_error: "stop" | "continue" | ((
err: NimbusDBPipelineOpsError
) => boolean);
__result: int;
__saves: NimbusDBPipelineSave[];
__snapshot: {
[name: int]: any;
};
__sys_temp?: Struct;
// ...public and internal methods
}

Creates a pipeline from the model.

model.d.ts
class NimbusDBModel {
// ... other methods and properties ...
static pipe(
_options?: NimbusDBPipelineOptions
): NimbusDBPipeline;
}
  • Type: NimbusDBPipelineOptions
  • Default: undefined
  • An optional object that allows you to customize the behavior of the pipe operation.
  • Type: NimbusDBPipeline
  • The pipeline instance.

NimbusDBPipelineCheckpoint Internal

Section titled “NimbusDBPipelineCheckpoint ”

A save point for the pipeline. Used to restore the pipeline to a specific state.

pipeline.d.ts
type NimbusDBPipelineCheckpoint = {
name: string;
pos: int;
}

An optional object that allows you to customize the behavior of the pipeline instance.

pipeline.d.ts
type NimbusDBPipelineOptions = Partial<{
cache: { // [INTERNAL] cache the result of the pipeline operations
[step_idx: int]: any;
};
cache_strategy: NIMBUSDB_PIPELINE_CACHE; // cache strategy for the pipeline (default = PARTIAL)
checkpoints: NimbusDBPipelineCheckpoint[]; // [INTERNAL] save points for the pipeline (default = [])
cursor: int; // cursor position for the pipeline (default = -1)
on_error: "stop" | "continue" | (( // error handler for the pipeline (default = "stop")
err: NimbusDBPipelineOpsError
) => boolean | null);
model: NimbusDBModel; // [INTERNAL] model instance
mutable: boolean; // return current pipeline instead of new pipeline (default = false)
result: int; // [INTERNAL] latest result of the pipeline, stored as cache key (default = -1)
}>;

An object that contains information about an error that occurred during pipeline execution.

pipeline.d.ts
type NimbusDBPipelineOpsError = {
data: any; // current pipeline result
message: string; // error message
pos: int; // current cursor position
type: NIMBUSDB_PIPELINE_OPS; // type of the current operation
}

An optional object that allows you to customize the behavior of the pipeline operations.

pipeline.d.ts
type NimbusDBPipelineOpsOptions = Partial<{
start_index: int; // start index for the data-searching operation (default = 0)
length: int; // length for the data-searching operation (default = data.length)
offset: int; // alias for `start_index`
limit: int; // alias for `length`
}>

NimbusDBPipelineSave Internal

Section titled “NimbusDBPipelineSave ”

A save point for the pipeline. Used to restore the pipeline to a specific state.

pipeline.d.ts
type NimbusDBPipelineSave = (
| { type: NIMBUSDB_PIPELINE_OPS.CHECKPOINT; name: string; }
| { type: NIMBUSDB_PIPELINE_OPS.CATCH; fn: (err: NimbusDBPipelineOpsError, data: any) => { data: any; stop: boolean; } | any | void; }
| { type: NIMBUSDB_PIPELINE_OPS.CLONE; with_cache: boolean; shared_cache: boolean; }
| { type: NIMBUSDB_PIPELINE_OPS.CONSTRUCT; columns: string[] | null; level: int; }
| { type: NIMBUSDB_PIPELINE_OPS.DESTRUCT; columns: string[] | null; }
| { type: NIMBUSDB_PIPELINE_OPS.DISTINCT; columns: string[]; }
| { type: NIMBUSDB_PIPELINE_OPS.DROP_WHILE; fn: (value: any, idx: int) => boolean; from_end: boolean; reverse: boolean; }
| { type: NIMBUSDB_PIPELINE_OPS.FILTER; fn: (value: any, idx: int) => boolean; }
| { type: NIMBUSDB_PIPELINE_OPS.FILTER; fn: (prop: string, value: any) => boolean; }
| { type: NIMBUSDB_PIPELINE_OPS.FIND; fn: (value: any, idx: int) => boolean; }
| { type: NIMBUSDB_PIPELINE_OPS.FLAT_MAP; fn: (value: any, idx: int) => any; level: int; }
| { type: NIMBUSDB_PIPELINE_OPS.FLATTEN; level: int; }
| { type: NIMBUSDB_PIPELINE_OPS.FLATTEN_DEEP; level: int; }
| { type: NIMBUSDB_PIPELINE_OPS.FORK; }
| { type: NIMBUSDB_PIPELINE_OPS.ISOLATE; }
| { type: NIMBUSDB_PIPELINE_OPS.LIMIT; count: int; }
| { type: NIMBUSDB_PIPELINE_OPS.MATERIALIZE; }
| { type: NIMBUSDB_PIPELINE_OPS.MERGE; data: any[]; }
| { type: NIMBUSDB_PIPELINE_OPS.MAP; fn: (value: any, idx: int) => any; }
| { type: NIMBUSDB_PIPELINE_OPS.MAP; fn: (prop: string, value: any) => any; columns: string[] | null; }
| { type: NIMBUSDB_PIPELINE_OPS.OFFSET; count: int; }
| { type: NIMBUSDB_PIPELINE_OPS.PLUCK; columns: string[]; }
| { type: NIMBUSDB_PIPELINE_OPS.REDUCE; fn: (acc: any, value: any, idx: int) => any; acc: any; }
| { type: NIMBUSDB_PIPELINE_OPS.REDUCE; fn: (acc: any, prop: string, value: any) => any; acc: any; columns: string[] | null; }
| { type: NIMBUSDB_PIPELINE_OPS.REJECT; fn: (value: any, idx: int) => any; }
| { type: NIMBUSDB_PIPELINE_OPS.RENAME; columns: string[] | Struct; }
| { type: NIMBUSDB_PIPELINE_OPS.SAMPLE; count: int; }
| { type: NIMBUSDB_PIPELINE_OPS.SLICE; start: int; length: int; }
| { type: NIMBUSDB_PIPELINE_OPS.SNAPSHOT; name: string; }
| { type: NIMBUSDB_PIPELINE_OPS.SORT; ascending: boolean; }
| { type: NIMBUSDB_PIPELINE_OPS.SORT; column: string; ascending: boolean; }
| { type: NIMBUSDB_PIPELINE_OPS.SORT; fn: (current: any, next: any) => number; }
| { type: NIMBUSDB_PIPELINE_OPS.TAKE_WHILE; fn: (value: any, idx: int) => boolean; }
| { type: NIMBUSDB_PIPELINE_OPS.TAP; }
| { type: NIMBUSDB_PIPELINE_OPS.TAP; opt: NimbusDBPrintOptions; }
| { type: NIMBUSDB_PIPELINE_OPS.TAP; fn: (value: any, idx: int) => void; }
| { type: NIMBUSDB_PIPELINE_OPS.TAP; fn: (prop: string, value: any) => void; }
| { type: NIMBUSDB_PIPELINE_OPS.TAP; fn: (value: any) => void; }
) & NimbusDBPipelineOpsOptions;

An optional object that allows you to customize the behavior of the pipeline operations.

pipeline.d.ts
type NimbusDBPipelineTerminalOptions = Partial<{
keep_cursor: boolean; // move the cursor to the `step` parameter (default = false)
ops_name: string; // [INTERNAL] name of the current operation
relative: boolean; // calculate the `step` parameter from the current cursor position (default = false)
silent: boolean; // [INTERNAL] suppress debug messages (default = false)
}> & NimbusDBPipelineOpsOptions;

NIMBUSDB_PIPELINE_OPS Internal

Section titled “NIMBUSDB_PIPELINE_OPS ”

An enum that represents the different types of pipeline operations.

pipeline.d.ts
enum NIMBUSDB_PIPELINE_OPS {
BIND,
CATCH,
CHECKPOINT,
CLONE,
CONSTRUCT,
DESTRUCT,
DISTINCT,
DROP_WHILE,
FILTER,
FIND,
FLAT_MAP,
FLATTEN,
FLATTEN_DEEP,
FORK,
ISOLATE,
LIMIT,
MAP,
MATERIALIZE,
MERGE,
OFFSET,
PLUCK,
REDUCE,
REJECT,
RENAME,
SAMPLE,
SLICE,
SNAPSHOT,
SORT,
TAKE_WHILE,
TAP
}

An enum that represents the different cache strategies for pipeline operations.

pipeline.d.ts
enum NIMBUSDB_PIPELINE_CACHE {
NONE,
FULL,
PARTIAL,
SMART
}