Skip to content

Data Transformation in Pipelines

In this section, we will explore the data transformation capabilities of NimbusDB pipelines. We’ll use this schema and the items model as an example:

var schema = {
id: {
type: NIMBUSDB_DATA_TYPE.INTEGER,
const: NIMBUSDB_CONSTRAINT.PRIMARY_KEY
},
name: NIMBUSDB_DATA_TYPE.STRING,
price: {
type: NIMBUSDB_DATA_TYPE.NUMBER,
validator: function(data, value) {
return value >= 0;
},
default_value: 0
},
is_locked: {
type: NIMBUSDB_DATA_TYPE.BOOLEAN,
const: NIMBUSDB_CONSTRAINT.OPTIONAL,
default_value: false
}
};
items = new NimbusDBModel("global", "items", schema, [
{ id: 1, name: "Apple", price: 5 },
{ id: 2, name: "Banana", price: 7.2 },
{ id: 3, name: "Cherry", price: 15 },
{ id: 4, name: "Date", price: 12.5 },
{ id: 5, name: "Elderberry", price: 8 },
{ id: 6, name: "Fig", price: 10 },
{ id: 7, name: "Grape", price: 6 },
{ id: 8, name: "Honeydew", price: 9 },
{ id: 9, name: "Kiwi", price: 4 },
{ id: 10, name: "Lemon", price: 3 }
]);

Transforming data refers to the process of modifying or altering the structure of data in a pipeline. This can involve operations such as mapping, flattening, and filtering data.

The map operation is used to transform data by applying a mapper function to each element in the pipeline. The function takes two arguments: the current element and its index. The function should return the transformed element.

var items_pl = items.pipe() // create an `items` pipeline
// let's say the `map` operation is the first operation in the pipeline
.map(function(data, index) {
return {
name: data.name,
price: data.price,
discounted_price: data.price * 0.9,
is_cheap: (data.price <= 5)
};
});
// result (isolated data):
// [
// { name: "Apple", price: 5, discounted_price: 4.5, is_cheap: true },
// { name: "Banana", price: 7.2, discounted_price: 6.48, is_cheap: false },
// ...
// { name: "Lemon", price: 3, discounted_price: 2.7, is_cheap: true }
// ]

The flat_map operation is similar to the map operation, but it flattens the result after applying the mapper function. So, you can return an array of transformed elements from the mapper function, and they will be flattened into the final result.

var items_pl = items.pipe() // create an `items` pipeline
// let's say you somehow have this in your pipeline (before the `flat_map` operation):
// [
// { id: 1, name: "Apple", price: 5 },
// [
// { id: 2, name: "Banana", price: 7.2 ]},
// { id: 3, name: "Cherry", price: 15 }
// ]
// ]
.flat_map(function(data, index) {
if (!is_array(data)) { // if the current element is not an array
return data.name; // return the name of the item
}
// otherwise, apply the mapper function to each item in the array
return array_map(data, function(item) {
return item.name; // return the name of each item in the array
});
});
// result after mapping:
// [ "Apple", [ "Banana", "Cherry" ] ]
// and the final result after flattening:
// [ "Apple", "Banana", "Cherry" ]

Filter Array Object

Section titled “Filter ”

The filter operation is used to filter the data based on a predicate function. The function should return true if the data should be included in the result, and false otherwise.

var items_pl = items.pipe() // create an `items` pipeline
// let's say the `filter` operation is the first operation in the pipeline
.filter(function(data, index) {
return (data.price % 5 == 0); // include only items with a price that is divisible by 5
});
// result (linked data):
// [
// { id: 1, name: "Apple", price: 5 },
// { id: 3, name: "Cherry", price: 15 },
// { id: 6, name: "Fig", price: 10 }
// ]

The reject operation is the opposite of the filter operation. It is used to filter the data based on a predicate function. The function should return true if the data should be excluded from the result, and false otherwise.

var items_pl = items.pipe() // create an `items` pipeline
// let's say the `reject` operation is the first operation in the pipeline
.reject(function(data, index) {
return data.price >= 5; // exclude items with a price greater than or equal to 5
});
// result (linked data):
// [
// { id: 9, name: "Kiwi", price: 4 },
// { id: 10, name: "Lemon", price: 3 }
// ]

The take_while operation is used to keep elements of the data while a predicate function returns true.

var items_pl = items.pipe() // create an `items` pipeline
// let's say the `take_while` operation is the first operation in the pipeline
.take_while(function(data, index) {
return (data.price != 6); // keep items until item with `price = 6` is found
});
// result (linked data):
// [
// { id: 1, name: "Apple", price: 5 },
// { id: 2, name: "Banana", price: 7.2 },
// ...
// { id: 6, name: "Fig", price: 10 }
// ]

The drop_while operation is the opposite of the take_while operation, which is used to skip elements of the data while a predicate function returns true.

// (1) from the start
var items_pl = items.pipe() // create an `items` pipeline
// let's say the `drop_while` operation is the first operation in the pipeline
.drop_while(function(data, index) {
return (data.price != 6); // exclude items until item with `price = 6` is found
});
// result (linked data):
// [
// { id: 7, name: "Grape", price: 6 },
// { id: 8, name: "Honeydew", price: 9 },
// { id: 9, name: "Kiwi", price: 4 },
// { id: 10, name: "Lemon", price: 3 }
// ]
// (2) from the end
var items_pl = items.pipe() // create an `items` pipeline
// let's say the `drop_while` operation is the first operation in the pipeline
.drop_while(function(data, index) {
return (data.price != 6); // exclude items until item with `price = 6` is found
}, {
from_end: true
});
// result (linked data):
// [
// { id: 1, name: "Apple", price: 5 },
// { id: 2, name: "Banana", price: 7.2 },
// ...
// { id: 7, name: "Grape", price: 6 }
// ]
// (3) reverse the array
var items_pl = items.pipe() // create an `items` pipeline
// let's say the `drop_while` operation is the first operation in the pipeline
.drop_while(function(data, index) {
return (data.price != 6); // exclude items until item with `price = 6` is found
}, {
reverse: true
});
// result (linked data):
// [
// { id: 10, name: "Lemon", price: 3 },
// { id: 9, name: "Kiwi", price: 4 },
// { id: 8, name: "Honeydew", price: 9 },
// { id: 7, name: "Grape", price: 6 }
// ]

The find operation is used to find the first element in the data that matches the predicate function. The function should return true if the element should be included in the result, and false otherwise.

var items_pl = items.pipe() // create an `items` pipeline
// let's say the `find` operation is the first operation in the pipeline
.find(function(data, index) {
return data.price >= 8; // find the first item with a price greater than or equal to 8
});
// result (linked data):
// { id: 3, name: "Cherry", price: 15 }

Pluck Array of Objects Object

Section titled “Pluck ”

The pluck operation is used to extract specific properties from the data. It takes a string or array of string of property names to extract.

// (1) pluck by column name
var items_pl = items.pipe() // create an `items` pipeline
// let's say the `pluck` operation is the first operation in the pipeline
.pluck("name"); // extract the `name` property
// result (isolated data):
// [
// { name: "Apple" },
// { name: "Banana" },
// ...
// { name: "Lemon" }
// ]
// (2) multiple column names
var items_pl = items.pipe() // create an `items` pipeline
// let's say the `pluck` operation is the first operation in the pipeline
.pluck(["name", "price"]); // extract the `name` and `price` properties
// result (isolated data):
// [
// { name: "Apple", price: 5 },
// { name: "Banana", price: 7.2 },
// ...
// { name: "Lemon", price: 3 }
// ]

Rename Array of Objects Object

Section titled “Rename ”

The rename operation is used to rename one or more properties in the data. It takes a string, array of string, or a map of old to new property names.

You can use :, ->, =>, as, or AS to separate the old and new property names.

var items_pl = items.pipe() // create an `items` pipeline
// let's say the `rename` operation is the first operation in the pipeline
.rename("name: item_name") // rename the `name` property to `item_name`
.rename("id as item_id, price -> item_price") // rename the `id` property to `item_id` and the `price` property to `item_price`
.rename({ item_id: "id", item_price: "cost" }); // rename the `item_id` property to `id` and the `item_price` property to `cost`
// result (isolated data):
// [
// { id: 1, item_name: "Apple", cost: 5 },
// { id: 2, item_name: "Banana", cost: 7.2 },
// ...
// { id: 10, item_name: "Lemon", cost: 3 }
// ]

The construct operation is used to construct an object from an array of values. It takes an optional column name(s) to use as object keys.

// (1) index-based construction
var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this in your pipeline (before the `construct` operation):
// [ 1, "Apple", 5, true, [ 2, "Green Apple", 7.3 ] ]
.construct();
// result (isolated data):
// {
// 0: 1,
// 1: "Apple",
// 2: 5,
// 3: true,
// 4: [ 2, "Green Apple", 7.3 ]
// }
// (2) column-based construction
var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this in your pipeline (before the `construct` operation):
// [ 1, "Apple", 5, true, [ 2, "Green Apple", 7.3 ] ]
.construct(["id", "name", "price", "is_locked", "variants"]); // with column names
// result (isolated data):
// {
// id: 1,
// name: "Apple",
// price: 5,
// is_locked: true,
// variants: [ 2, "Green Apple", 7.3 ]
// }
// (3) column-based construction with level
var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this in your pipeline (before the `construct` operation):
// [ 1, "Apple", 5, true, [ 2, "Green Apple", 7.3 ] ]
.construct(["id", "name", "price", "is_locked", "variants"], 2); // with column names and level = 2
// result (isolated data):
// {
// id: 1,
// name: "Apple",
// price: 5,
// is_locked: true,
// variants: { id: 2, name: "Green Apple", price: 7.3 }
// }

Destruct Array Object

Section titled “Destruct ”

The destruct operation is used to destruct object(s) into an array of values. It takes an optional column name(s) to use as the column order.

// (1) auto-detect column order
var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this in your pipeline (before the `destruct` operation):
// [
// { name: "Apple", price: 5 },
// { name: "Banana", price: 7.2, is_cheap: true },
// { name: "Cherry", price: 15, discounted_price: 12.5 }
// ]
.destruct();
// result (isolated data):
// [
// [ "Apple", 5 ],
// [ "Banana", 7.2 ],
// [ "Cherry", 15 ]
// ]
// other example for this format:
var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this in your pipeline (before the `destruct` operation):
// [
// { name: "Banana", price: 7.2, is_cheap: true },
// { name: "Apple", price: 5 },
// { name: "Cherry", price: 15, discounted_price: 12.5 }
// ]
.destruct();
// result (isolated data):
// [
// [ "Banana", 7.2, true ],
// [ "Apple", 5, undefined ],
// [ "Cherry", 15, undefined ]
// ]
// (2) column-based destruction
var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this in your pipeline (before the `destruct` operation):
// [
// { name: "Apple", price: 5 },
// { name: "Banana", price: 7.2, is_cheap: true },
// { name: "Cherry", price: 15, discounted_price: 12.5 }
// ]
.destruct("name"); // with column names
// result (isolated data):
// [
// [ "Apple" ],
// [ "Banana" ],
// [ "Cherry" ]
// ]
// (3) multiple column destruction
var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this in your pipeline (before the `destruct` operation):
// [
// { name: "Apple", price: 5 },
// { name: "Banana", price: 7.2, is_cheap: true },
// { name: "Cherry", price: 15, discounted_price: 12.5 }
// ]
.destruct(["price", "name"]); // with column names
// result (isolated data):
// [
// [ 5, "Apple" ],
// [ 7.2, "Banana" ],
// [ 15, "Cherry" ]
// ]

Converts an array of values into struct objects using the provided column names as keys.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static construct(
_columns?: string | string[],
_level?: int
): NimbusDBPipeline;
}
  • Type: string | string[]
  • Default: undefined (use index as the object keys)
  • An optional array of column names defining the output order.
  • Type: int
  • Default: 1
  • An optional nesting level (default = 1).
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Converts struct objects into arrays (destructures) using the given column order.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static destruct(
_column_order?: string[]
): NimbusDBPipeline;
}
  • Type: string[]
  • Default: undefined (auto-detect column order)
  • An optional array of column names defining the output order.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Skip elements from the start (or end) of the pipeline while a condition is true.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static drop_while(
_func: (data: any, index: int) => boolean,
_options?: NimbusDBPipelineDropWhileOptions
): NimbusDBPipeline;
}
  • Type: (data: any, index: int) => boolean
  • The predicate function to use for the drop_while operation.
  • Type: NimbusDBPipelineDropWhileOptions
  • Default: undefined
  • An optional object that allows you to customize the behavior of the drop_while operation.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Filters the pipeline data using a predicate function on array elements.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static filter(
_func:
| ((data: any, index: int) => boolean) // array
| ((prop: string, value: any) => boolean), // object
_options?: NimbusDBPipelineOpsOptions
): NimbusDBPipeline;
}
  • Type: (data: any, index: int) => boolean | (prop: string, value: any) => boolean
  • The predicate function to use for the filter operation.
  • Type: NimbusDBPipelineOpsOptions
  • Default: undefined
  • An optional object that allows you to customize the behavior of the filter operation.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Finds the first element in the pipeline that satisfies the condition and stores it as the result.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static find(
_func: (data: any, index: int) => boolean,
_options?: NimbusDBPipelineOpsOptions
): NimbusDBPipeline;
}
  • Type: (data: any, index: int) => boolean
  • The predicate function to use for the find operation.
  • Type: NimbusDBPipelineOpsOptions
  • Default: undefined
  • An optional object that allows you to customize the behavior of the find operation.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Maps each element through a function then flattens the result by level levels.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static flat_map(
_func: (data: any, index: int) => any,
_level?: int,
_options?: NimbusDBPipelineOpsOptions
): NimbusDBPipeline;
}
  • Type: (data: any, index: int) => any
  • The mapping function to use for the flat_map operation.
  • Type: int
  • Default: 1
  • The number of levels to flatten the result.
  • Type: NimbusDBPipelineOpsOptions
  • Default: undefined
  • An optional object that allows you to customize the behavior of the flat_map operation.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Transforms each element using a mapping function.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static map(
_func:
| ((data: any, index: int) => any) // array
| ((prop: string, value: any) => any), // object
_options?: NimbusDBPipelineOpsOptions
): NimbusDBPipeline;
}
  • Type: (data: any, index: int) => any | (prop: string, value: any) => any
  • The mapping function to use for the map operation.
  • Type: NimbusDBPipelineOpsOptions
  • Default: undefined
  • An optional object that allows you to customize the behavior of the map operation.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Extracts one or more columns from each element, returning only those values.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static pluck(
_column: string | string[]
): NimbusDBPipeline;
}
  • Type: string | string[]
  • The column name(s) to pluck.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Filters the pipeline data using a predicate function on array elements.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static reject(
_func: (data: any, index: int) => boolean,
_options?: NimbusDBPipelineOpsOptions
): NimbusDBPipeline;
}
  • Type: (data: any, index: int) => boolean
  • The predicate function to use for the reject operation.
  • Type: NimbusDBPipelineOpsOptions
  • Default: undefined
  • An optional object that allows you to customize the behavior of the reject operation.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Renames one or more columns in the pipeline data.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static rename(
_columns: string | string[] | {
[column_name: string]: string;
}
): NimbusDBPipeline;
}
  • Type: string | string[] | { [column_name: string]: string; }
  • The column name(s) to rename.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Keeps elements from the start of the pipeline while the predicate returns true, and stops if the predicate returns false.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static take_while(
_func: (data: any, index: int) => boolean,
_options?: NimbusDBPipelineOpsOptions
): NimbusDBPipeline;
}
  • Type: (data: any, index: int) => boolean
  • The predicate function to use for the take_while operation.
  • Type: NimbusDBPipelineOpsOptions
  • Default: undefined
  • An optional object that allows you to customize the behavior of the take_while operation.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Optional configurations for .drop_while() method.

pipeline.d.ts
export type NimbusDBPipelineDropWhileOptions = Partial<{
from_end: boolean; // drop from the end to the start (default = false)
reverse: boolean; // reverse the array after operation (default = false)
}>;