Skip to content

Data Shaping in Pipelines

In this section, we’ll explore how to shape your data in Pipeline. We’ll use items model for this example.

var schema = {
id: {
type: NIMBUSDB_DATA_TYPE.INTEGER,
const: NIMBUSDB_CONSTRAINT.PRIMARY_KEY
},
name: NIMBUSDB_DATA_TYPE.STRING,
price: {
type: NIMBUSDB_DATA_TYPE.NUMBER,
validator: function(data, value) {
return value >= 0;
},
default_value: 0
},
is_locked: {
type: NIMBUSDB_DATA_TYPE.BOOLEAN,
const: NIMBUSDB_CONSTRAINT.OPTIONAL,
default_value: false
}
};
items = new NimbusDBModel("global", "items", schema, [
{ id: 1, name: "Apple", price: 5 },
{ id: 2, name: "Banana", price: 7.2 },
{ id: 3, name: "Cherry", price: 15 },
{ id: 4, name: "Date", price: 12.5 },
{ id: 5, name: "Elderberry", price: 8 },
{ id: 6, name: "Fig", price: 10 },
{ id: 7, name: "Grape", price: 6 },
{ id: 8, name: "Honeydew", price: 9 },
{ id: 9, name: "Kiwi", price: 4 },
{ id: 10, name: "Lemon", price: 3 }
]);

The flatten operation is used to flatten the data by one (or more) level(s). It is useful when you want to flatten a nested array in the pipeline.

// (1) flatten the data by one level
var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this in your pipeline (before the `flatten` operation):
// [
// "Apple",
// "Banana",
// [
// "Cherry",
// "Date",
// [
// "Elderberry",
// "Fig"
// ]
// ]
// }
.flatten();
// result:
// [
// "Apple",
// "Banana",
// "Cherry",
// "Date",
// [ "Elderberry", "Fig" ]
// ]
// (2) flatten the data by two levels
var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this in your pipeline (before the `flatten` operation):
// [
// "Apple",
// "Banana",
// [
// "Cherry",
// "Date",
// [
// "Elderberry",
// "Fig"
// ]
// ]
// }
.flatten(2);
// result:
// [
// "Apple",
// "Banana",
// "Cherry",
// "Date",
// "Elderberry",
// "Fig"
// ]

The flatten_deep operation is similar to the flatten operation, but it flattens the result to a single level.

var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this in your pipeline (before the `flatten_deep` operation):
// [
// "Apple",
// [
// "Banana",
// [ "Cherry" ],
// "Date",
// [
// "Elderberry",
// "Fig",
// [ "Grape", "Honeydew" ]
// ]
// ]
// }
.flatten_deep();
// result:
// [ "Apple", "Banana", "Cherry", "Date", "Elderberry", "Fig", "Grape", "Honeydew" ]

The merge operation is used to merge additional data by appending it to the current pipeline result.

var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this data in your pipeline:
// [
// "abc",
// 123,
// true,
// [ 1, 2, 3 ],
// { a: 1, b: 2 }
// ]
.merge("element 1", 456, [ 4, 5, 6 ], { c: 3, d: 4 });
// result:
// [
// "abc",
// 123,
// true,
// [ 1, 2, 3 ],
// { a: 1, b: 2 },
// "element 1",
// 456,
// [ 4, 5, 6 ],
// { c: 3, d: 4 }
// ]

Distinct Array of Objects Object

Section titled “Distinct ”

The distinct operation is used to remove duplicate values from the pipeline data based on one or more columns.

// (1) dedupe single column
var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this data in your pipeline:
// [
// { id: 1, name: "Apricot", price: 5 },
// { id: 2, name: "Blueberry", price: 7 },
// { id: 3, name: "Coconut", price: 5 },
// { id: 4, name: "Durian", price: 10 },
// { id: 5, name: "Fuji Apple", price: 7 }
// ]
// result:
// [
// { price: 5 },
// { price: 7 },
// { price: 10 }
// ]
// (2) dedupe multiple columns
var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this data in your pipeline:
// [
// { id: 1, name: "Apricot", price: 5, is_soft: true },
// { id: 2, name: "Blueberry", price: 7, is_soft: true },
// { id: 3, name: "Coconut", price: 5, is_soft: false },
// { id: 4, name: "Durian", price: 10, is_soft: false },
// { id: 5, name: "Fuji Apple", price: 7, is_soft: true }
// ]
.distinct(["price", "is_soft"]);
// result:
// [
// { price: 5, is_soft: true },
// { price: 7, is_soft: true },
// { price: 5, is_soft: false },
// { price: 10, is_soft: false }
// ]

The sample operation is used to randomly sample _count elements from the pipeline data.

var items_pl = items.pipe() // create an `items` pipeline
// let's say you have this data in your pipeline:
// [ 5, 7.2, 15, 12.5, 8, 10, 6, 9, 4, 3 ]
.sample(5); // sample 5 elements
// result:
// [ (random 5 elements) ]

Removes duplicate values from the pipeline data based on one or more columns.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static distinct(
_column: string | string[]
): NimbusDBPipeline;
}
  • Type: string | string[]
  • The column name(s) to deduplicate on.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Flattens the pipeline data by the given number of levels.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static flatten(
_level?: int
): NimbusDBPipeline;
}
  • Type: int
  • Default: 1
  • The number of levels to flatten the result.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Deeply flattens the pipeline data to a single level.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static flatten_deep(): NimbusDBPipeline;
}
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Merges additional data into the current pipeline result.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static merge(
..._extra_data: any
): NimbusDBPipeline;
}
  • Type: any
  • One or more data items to merge in.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).

Randomly samples _count elements from the pipeline data.

pipeline.d.ts
class NimbusDBPipeline {
// ... other methods and properties ...
static sample(
_count: int
): NimbusDBPipeline;
}
  • Type: int
  • The number of elements to sample.
  • Type: NimbusDBPipeline
  • A new NimbusDBPipeline instance (mutable = false) or the current pipeline instance (mutable = true).