Class: ElasticGraph::SchemaDefinition::Indexing::DerivedIndexedType

Inherits:
Object
  • Object
show all
Defined in:
elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb

Overview

Used to configure the derivation of a derived indexed type from a source type. This type is yielded from Mixins::HasIndices#derive_indexed_type_fields.

Examples:

Derive a Course type from StudentCourseEnrollment events

ElasticGraph.define_schema do |schema|
  # `StudentCourseEnrollment` is a directly indexed type.
  schema.object_type "StudentCourseEnrollment" do |t|
    t.field "id", "ID"
    t.field "courseId", "ID"
    t.field "courseName", "String"
    t.field "studentName", "String"
    t.field "courseStartDate", "Date"

    t.index "student_course_enrollments"

    # Here we define how the `Course` indexed type  is derived when we index `StudentCourseEnrollment` events.
    t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
      # `derive` is an instance of `DerivedIndexedType`.
      derive.immutable_value "name", from: "courseName"
      derive.append_only_set "students", from: "studentName"
      derive.min_value "firstOfferedDate", from: "courseStartDate"
      derive.max_value "mostRecentlyOfferedDate", from: "courseStartDate"
    end
  end

  # `Course` is an indexed type that is derived entirely from `StudentCourseEnrollment` events.
  schema.object_type "Course" do |t|
    t.field "id", "ID"
    t.field "name", "String"
    t.field "students", "[String!]!"
    t.field "firstOfferedDate", "Date"
    t.field "mostRecentlyOfferedDate", "Date"

    t.index "courses"
  end
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(source_type:, destination_type_ref:, id_source:, routing_value_source:, rollover_timestamp_value_source:) {|DerivedIndexedType| ... } ⇒ DerivedIndexedType

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of DerivedIndexedType.

Parameters:

  • source_type (SchemaElements::ObjectType)

    the type used as a source for this derive type

  • destination_type_ref (SchemaElements::TypeReference)

    the derived type

  • id_source (String)

    path to field on the source type used as id on the derived type

  • routing_value_source (String, nil)

    path to field on the source type used for shard routing

  • rollover_timestamp_value_source (String, nil)

    path to field on the source type used as the timestamp field for rollover

Yields:



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 82

def initialize(
  source_type:,
  destination_type_ref:,
  id_source:,
  routing_value_source:,
  rollover_timestamp_value_source:
)
  fields = [] # : ::Array[_DerivedField]
  super(
    source_type: source_type,
    destination_type_ref: destination_type_ref,
    id_source: id_source,
    routing_value_source: routing_value_source,
    rollover_timestamp_value_source: rollover_timestamp_value_source,
    fields: fields
  )
  yield self
end

Instance Attribute Details

#fieldsArray<DerivedFields::AppendOnlySet, DerivedFields::ImmutableValue, DerivedFields::MinOrMaxValue>

Returns derived field definitions.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 67

class DerivedIndexedType < ::Struct.new(
  :source_type,
  :destination_type_ref,
  :id_source,
  :routing_value_source,
  :rollover_timestamp_value_source,
  :fields
)
  # @param source_type [SchemaElements::ObjectType] the type used as a source for this derive type
  # @param destination_type_ref [SchemaElements::TypeReference] the derived type
  # @param id_source [String] path to field on the source type used as `id` on the derived type
  # @param routing_value_source [String, nil] path to field on the source type used for shard routing
  # @param rollover_timestamp_value_source [String, nil] path to field on the source type used as the timestamp field for rollover
  # @yield [DerivedIndexedType] the `DerivedIndexedType` instance
  # @api private
  def initialize(
    source_type:,
    destination_type_ref:,
    id_source:,
    routing_value_source:,
    rollover_timestamp_value_source:
  )
    fields = [] # : ::Array[_DerivedField]
    super(
      source_type: source_type,
      destination_type_ref: destination_type_ref,
      id_source: id_source,
      routing_value_source: routing_value_source,
      rollover_timestamp_value_source: rollover_timestamp_value_source,
      fields: fields
    )
    yield self
  end

  # Configures `field_name` (on the derived indexing type) to contain the set union of all values from
  # the `from` field on the source type. Values are only ever appended to the set, so the field will
  # act as an append-only set.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived set
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::AppendOnlySet]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "studentName", "String"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.append_only_set "students", from: "studentName"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "students", "[String!]!"
  #
  #       t.index "courses"
  #     end
  #   end
  def append_only_set(field_name, from:)
    fields << DerivedFields::AppendOnlySet.new(field_name, from)
  end

  # Configures `field_name` (on the derived indexing type) to contain a single immutable value from the
  # `from` field on the source type. Immutability is enforced by triggering an indexing failure with a
  # clear error if any event's source value is different from the value already indexed on this field.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @param nullable [Boolean] whether the field is allowed to be set to `null`. When set to false, events
  #   that contain a `null` value in the `from` field will be rejected instead of setting the field’s value
  #   to `null`.
  # @param can_change_from_null [Boolean] whether a one-time mutation of the field value is allowed from
  #   `null` to a non-`null` value. This can be useful when dealing with a field that may not have a value
  #   on all source events. For example, if the source field was not initially part of the schema of your
  #   source dataset, you may have old records that lack a value for this field. When set, this option
  #   allows a one-time mutation of the field value from `null` to a non-`null` value. Once set to a
  #   non-`null` value, any additional `null` values that are encountered will be ignored (ensuring that
  #   the indexed data converges on the same state regardless of the order the events are ingested in).
  #   Note: this option cannot be enabled when `nullable: false` has been set.
  # @return [DerivedFields::ImmutableValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseName", "String"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.immutable_value "name", from: "courseName"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "name", "String"
  #
  #       t.index "courses"
  #     end
  #   end
  def immutable_value(field_name, from:, nullable: true, can_change_from_null: false)
    if !nullable && can_change_from_null
      raise Errors::SchemaError, "`can_change_from_null: true` is not allowed with `nullable: false` (as there would be no `null` values to change from)."
    end

    fields << DerivedFields::ImmutableValue.new(
      destination_field: field_name,
      source_field: from,
      nullable: nullable,
      can_change_from_null: can_change_from_null
    )
  end

  # Configures `field_name` (on the derived indexing type) to contain the minimum of all values from the `from`
  # field on the source type.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::MinOrMaxValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseStartDate", "Date"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.min_value "firstOfferedDate", from: "courseStartDate"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "firstOfferedDate", "Date"
  #
  #       t.index "courses"
  #     end
  #   end
  def min_value(field_name, from:)
    fields << DerivedFields::MinOrMaxValue.new(field_name, from, :min)
  end

  # Configures `field_name` (on the derived indexing type) to contain the maximum of all values from the `from`
  # field on the source type.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::MinOrMaxValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseStartDate", "Date"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.max_value "mostRecentlyOfferedDate", from: "courseStartDate"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "mostRecentlyOfferedDate", "Date"
  #
  #       t.index "courses"
  #     end
  #   end
  def max_value(field_name, from:)
    fields << DerivedFields::MinOrMaxValue.new(field_name, from, :max)
  end

  # @return [Scripting::Script] Painless script that will maintain the derived fields
  # @api private
  def painless_script
    Scripting::Script.new(
      source: generate_script.strip,
      name: "#{destination_type_ref}_from_#{source_type.name}",
      language: "painless",
      context: "update"
    )
  end

  # @return [SchemaArtifacts::RuntimeMetadata::UpdateTarget] runtime metadata for the source type
  # @api private
  def 
    SchemaArtifacts::RuntimeMetadata::UpdateTarget.new(
      type: destination_type_ref.name,
      relationship: nil,
      script_id: painless_script.id,
      id_source: id_source,
      routing_value_source: routing_value_source,
      rollover_timestamp_value_source: rollover_timestamp_value_source,
      metadata_params: {},
      data_params: fields.map(&:source_field).to_h do |f|
        [f, SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: f, cardinality: :many)]
      end
    )
  end

  private

  def generate_script
    if fields.empty?
      raise Errors::SchemaError, "`derive_indexed_type_fields` definition for #{destination_type_ref} (from #{source_type.name}) " \
        "has no derived field definitions."
    end

    sorted_fields = fields.sort_by(&:destination_field)

    # We use `uniq` here to avoid re-doing the same setup multiple times, since multiple fields can sometimes
    # need the same setup (such as initializing a common parent field to an empty map).
    function_defs = sorted_fields.flat_map(&:function_definitions).uniq.map(&:strip).sort

    setup_statements = [STATIC_SETUP_STATEMENTS] + sorted_fields.flat_map(&:setup_statements).uniq.map(&:strip)

    apply_update_statements = sorted_fields.map { |f| apply_update_statement(f).strip }

    # Note: comments in the script are effectively "free" since:
    #
    #   - The compiler will strip them out.
    #   - We only send the script to the datastore once (when configuring the cluster), and later
    #     reference it only by id--so we don't pay for the larger payload on each indexing request.
    <<~EOS
      #{function_defs.join("\n\n")}

      #{setup_statements.join("\n")}

      #{apply_update_statements.join("\n")}

      if (!#{SCRIPT_ERRORS_VAR}.isEmpty()) {
        throw new IllegalArgumentException("#{DERIVED_INDEX_FAILURE_MESSAGE_PREAMBLE}: " + #{SCRIPT_ERRORS_VAR}.join(" "));
      }

      // For records with no new values to index, only skip the update if the document itself doesn't already exist.
      // Otherwise create an (empty) document to reflect the fact that the id has been seen.
      if (ctx._source.id != null && #{sorted_fields.map { |f| was_noop_variable(f) }.join(" && ")}) {
        ctx.op = 'none';
      } else {
        // Here we set `_source.id` because if we don't, it'll never be set, making these docs subtly
        // different from docs indexed the normal way.
        //
        // Note also that we MUST use `params.id` instead of `ctx._id`. The latter works on an update
        // of an existing document, but is unavailable when we are inserting the document for the first time.
        ctx._source.id = params.id;
      }
    EOS
  end

  def apply_update_statement(field)
    "boolean #{was_noop_variable(field)} = !#{field.apply_operation_returning_update_status};"
  end

  def was_noop_variable(field)
    "#{field.destination_field.gsub(".", "__")}_was_noop"
  end

  SCRIPT_ERRORS_VAR = "scriptErrors"

  STATIC_SETUP_STATEMENTS = <<~EOS.strip
    Map data = params.data;
    // A variable to accumulate script errors so that we can surface _all_ issues and not just the first.
    List #{SCRIPT_ERRORS_VAR} = new ArrayList();
  EOS
end

#id_sourceString

Returns path to field on the source type used as id on the derived type.

Returns:

  • (String)

    path to field on the source type used as id on the derived type



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 67

class DerivedIndexedType < ::Struct.new(
  :source_type,
  :destination_type_ref,
  :id_source,
  :routing_value_source,
  :rollover_timestamp_value_source,
  :fields
)
  # @param source_type [SchemaElements::ObjectType] the type used as a source for this derive type
  # @param destination_type_ref [SchemaElements::TypeReference] the derived type
  # @param id_source [String] path to field on the source type used as `id` on the derived type
  # @param routing_value_source [String, nil] path to field on the source type used for shard routing
  # @param rollover_timestamp_value_source [String, nil] path to field on the source type used as the timestamp field for rollover
  # @yield [DerivedIndexedType] the `DerivedIndexedType` instance
  # @api private
  def initialize(
    source_type:,
    destination_type_ref:,
    id_source:,
    routing_value_source:,
    rollover_timestamp_value_source:
  )
    fields = [] # : ::Array[_DerivedField]
    super(
      source_type: source_type,
      destination_type_ref: destination_type_ref,
      id_source: id_source,
      routing_value_source: routing_value_source,
      rollover_timestamp_value_source: rollover_timestamp_value_source,
      fields: fields
    )
    yield self
  end

  # Configures `field_name` (on the derived indexing type) to contain the set union of all values from
  # the `from` field on the source type. Values are only ever appended to the set, so the field will
  # act as an append-only set.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived set
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::AppendOnlySet]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "studentName", "String"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.append_only_set "students", from: "studentName"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "students", "[String!]!"
  #
  #       t.index "courses"
  #     end
  #   end
  def append_only_set(field_name, from:)
    fields << DerivedFields::AppendOnlySet.new(field_name, from)
  end

  # Configures `field_name` (on the derived indexing type) to contain a single immutable value from the
  # `from` field on the source type. Immutability is enforced by triggering an indexing failure with a
  # clear error if any event's source value is different from the value already indexed on this field.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @param nullable [Boolean] whether the field is allowed to be set to `null`. When set to false, events
  #   that contain a `null` value in the `from` field will be rejected instead of setting the field’s value
  #   to `null`.
  # @param can_change_from_null [Boolean] whether a one-time mutation of the field value is allowed from
  #   `null` to a non-`null` value. This can be useful when dealing with a field that may not have a value
  #   on all source events. For example, if the source field was not initially part of the schema of your
  #   source dataset, you may have old records that lack a value for this field. When set, this option
  #   allows a one-time mutation of the field value from `null` to a non-`null` value. Once set to a
  #   non-`null` value, any additional `null` values that are encountered will be ignored (ensuring that
  #   the indexed data converges on the same state regardless of the order the events are ingested in).
  #   Note: this option cannot be enabled when `nullable: false` has been set.
  # @return [DerivedFields::ImmutableValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseName", "String"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.immutable_value "name", from: "courseName"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "name", "String"
  #
  #       t.index "courses"
  #     end
  #   end
  def immutable_value(field_name, from:, nullable: true, can_change_from_null: false)
    if !nullable && can_change_from_null
      raise Errors::SchemaError, "`can_change_from_null: true` is not allowed with `nullable: false` (as there would be no `null` values to change from)."
    end

    fields << DerivedFields::ImmutableValue.new(
      destination_field: field_name,
      source_field: from,
      nullable: nullable,
      can_change_from_null: can_change_from_null
    )
  end

  # Configures `field_name` (on the derived indexing type) to contain the minimum of all values from the `from`
  # field on the source type.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::MinOrMaxValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseStartDate", "Date"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.min_value "firstOfferedDate", from: "courseStartDate"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "firstOfferedDate", "Date"
  #
  #       t.index "courses"
  #     end
  #   end
  def min_value(field_name, from:)
    fields << DerivedFields::MinOrMaxValue.new(field_name, from, :min)
  end

  # Configures `field_name` (on the derived indexing type) to contain the maximum of all values from the `from`
  # field on the source type.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::MinOrMaxValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseStartDate", "Date"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.max_value "mostRecentlyOfferedDate", from: "courseStartDate"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "mostRecentlyOfferedDate", "Date"
  #
  #       t.index "courses"
  #     end
  #   end
  def max_value(field_name, from:)
    fields << DerivedFields::MinOrMaxValue.new(field_name, from, :max)
  end

  # @return [Scripting::Script] Painless script that will maintain the derived fields
  # @api private
  def painless_script
    Scripting::Script.new(
      source: generate_script.strip,
      name: "#{destination_type_ref}_from_#{source_type.name}",
      language: "painless",
      context: "update"
    )
  end

  # @return [SchemaArtifacts::RuntimeMetadata::UpdateTarget] runtime metadata for the source type
  # @api private
  def 
    SchemaArtifacts::RuntimeMetadata::UpdateTarget.new(
      type: destination_type_ref.name,
      relationship: nil,
      script_id: painless_script.id,
      id_source: id_source,
      routing_value_source: routing_value_source,
      rollover_timestamp_value_source: rollover_timestamp_value_source,
      metadata_params: {},
      data_params: fields.map(&:source_field).to_h do |f|
        [f, SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: f, cardinality: :many)]
      end
    )
  end

  private

  def generate_script
    if fields.empty?
      raise Errors::SchemaError, "`derive_indexed_type_fields` definition for #{destination_type_ref} (from #{source_type.name}) " \
        "has no derived field definitions."
    end

    sorted_fields = fields.sort_by(&:destination_field)

    # We use `uniq` here to avoid re-doing the same setup multiple times, since multiple fields can sometimes
    # need the same setup (such as initializing a common parent field to an empty map).
    function_defs = sorted_fields.flat_map(&:function_definitions).uniq.map(&:strip).sort

    setup_statements = [STATIC_SETUP_STATEMENTS] + sorted_fields.flat_map(&:setup_statements).uniq.map(&:strip)

    apply_update_statements = sorted_fields.map { |f| apply_update_statement(f).strip }

    # Note: comments in the script are effectively "free" since:
    #
    #   - The compiler will strip them out.
    #   - We only send the script to the datastore once (when configuring the cluster), and later
    #     reference it only by id--so we don't pay for the larger payload on each indexing request.
    <<~EOS
      #{function_defs.join("\n\n")}

      #{setup_statements.join("\n")}

      #{apply_update_statements.join("\n")}

      if (!#{SCRIPT_ERRORS_VAR}.isEmpty()) {
        throw new IllegalArgumentException("#{DERIVED_INDEX_FAILURE_MESSAGE_PREAMBLE}: " + #{SCRIPT_ERRORS_VAR}.join(" "));
      }

      // For records with no new values to index, only skip the update if the document itself doesn't already exist.
      // Otherwise create an (empty) document to reflect the fact that the id has been seen.
      if (ctx._source.id != null && #{sorted_fields.map { |f| was_noop_variable(f) }.join(" && ")}) {
        ctx.op = 'none';
      } else {
        // Here we set `_source.id` because if we don't, it'll never be set, making these docs subtly
        // different from docs indexed the normal way.
        //
        // Note also that we MUST use `params.id` instead of `ctx._id`. The latter works on an update
        // of an existing document, but is unavailable when we are inserting the document for the first time.
        ctx._source.id = params.id;
      }
    EOS
  end

  def apply_update_statement(field)
    "boolean #{was_noop_variable(field)} = !#{field.apply_operation_returning_update_status};"
  end

  def was_noop_variable(field)
    "#{field.destination_field.gsub(".", "__")}_was_noop"
  end

  SCRIPT_ERRORS_VAR = "scriptErrors"

  STATIC_SETUP_STATEMENTS = <<~EOS.strip
    Map data = params.data;
    // A variable to accumulate script errors so that we can surface _all_ issues and not just the first.
    List #{SCRIPT_ERRORS_VAR} = new ArrayList();
  EOS
end

#rollover_timestamp_value_sourceString?

Returns path to field on the source type used as the timestamp field for rollover.

Returns:

  • (String, nil)

    path to field on the source type used as the timestamp field for rollover



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 67

class DerivedIndexedType < ::Struct.new(
  :source_type,
  :destination_type_ref,
  :id_source,
  :routing_value_source,
  :rollover_timestamp_value_source,
  :fields
)
  # @param source_type [SchemaElements::ObjectType] the type used as a source for this derive type
  # @param destination_type_ref [SchemaElements::TypeReference] the derived type
  # @param id_source [String] path to field on the source type used as `id` on the derived type
  # @param routing_value_source [String, nil] path to field on the source type used for shard routing
  # @param rollover_timestamp_value_source [String, nil] path to field on the source type used as the timestamp field for rollover
  # @yield [DerivedIndexedType] the `DerivedIndexedType` instance
  # @api private
  def initialize(
    source_type:,
    destination_type_ref:,
    id_source:,
    routing_value_source:,
    rollover_timestamp_value_source:
  )
    fields = [] # : ::Array[_DerivedField]
    super(
      source_type: source_type,
      destination_type_ref: destination_type_ref,
      id_source: id_source,
      routing_value_source: routing_value_source,
      rollover_timestamp_value_source: rollover_timestamp_value_source,
      fields: fields
    )
    yield self
  end

  # Configures `field_name` (on the derived indexing type) to contain the set union of all values from
  # the `from` field on the source type. Values are only ever appended to the set, so the field will
  # act as an append-only set.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived set
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::AppendOnlySet]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "studentName", "String"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.append_only_set "students", from: "studentName"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "students", "[String!]!"
  #
  #       t.index "courses"
  #     end
  #   end
  def append_only_set(field_name, from:)
    fields << DerivedFields::AppendOnlySet.new(field_name, from)
  end

  # Configures `field_name` (on the derived indexing type) to contain a single immutable value from the
  # `from` field on the source type. Immutability is enforced by triggering an indexing failure with a
  # clear error if any event's source value is different from the value already indexed on this field.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @param nullable [Boolean] whether the field is allowed to be set to `null`. When set to false, events
  #   that contain a `null` value in the `from` field will be rejected instead of setting the field’s value
  #   to `null`.
  # @param can_change_from_null [Boolean] whether a one-time mutation of the field value is allowed from
  #   `null` to a non-`null` value. This can be useful when dealing with a field that may not have a value
  #   on all source events. For example, if the source field was not initially part of the schema of your
  #   source dataset, you may have old records that lack a value for this field. When set, this option
  #   allows a one-time mutation of the field value from `null` to a non-`null` value. Once set to a
  #   non-`null` value, any additional `null` values that are encountered will be ignored (ensuring that
  #   the indexed data converges on the same state regardless of the order the events are ingested in).
  #   Note: this option cannot be enabled when `nullable: false` has been set.
  # @return [DerivedFields::ImmutableValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseName", "String"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.immutable_value "name", from: "courseName"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "name", "String"
  #
  #       t.index "courses"
  #     end
  #   end
  def immutable_value(field_name, from:, nullable: true, can_change_from_null: false)
    if !nullable && can_change_from_null
      raise Errors::SchemaError, "`can_change_from_null: true` is not allowed with `nullable: false` (as there would be no `null` values to change from)."
    end

    fields << DerivedFields::ImmutableValue.new(
      destination_field: field_name,
      source_field: from,
      nullable: nullable,
      can_change_from_null: can_change_from_null
    )
  end

  # Configures `field_name` (on the derived indexing type) to contain the minimum of all values from the `from`
  # field on the source type.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::MinOrMaxValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseStartDate", "Date"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.min_value "firstOfferedDate", from: "courseStartDate"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "firstOfferedDate", "Date"
  #
  #       t.index "courses"
  #     end
  #   end
  def min_value(field_name, from:)
    fields << DerivedFields::MinOrMaxValue.new(field_name, from, :min)
  end

  # Configures `field_name` (on the derived indexing type) to contain the maximum of all values from the `from`
  # field on the source type.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::MinOrMaxValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseStartDate", "Date"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.max_value "mostRecentlyOfferedDate", from: "courseStartDate"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "mostRecentlyOfferedDate", "Date"
  #
  #       t.index "courses"
  #     end
  #   end
  def max_value(field_name, from:)
    fields << DerivedFields::MinOrMaxValue.new(field_name, from, :max)
  end

  # @return [Scripting::Script] Painless script that will maintain the derived fields
  # @api private
  def painless_script
    Scripting::Script.new(
      source: generate_script.strip,
      name: "#{destination_type_ref}_from_#{source_type.name}",
      language: "painless",
      context: "update"
    )
  end

  # @return [SchemaArtifacts::RuntimeMetadata::UpdateTarget] runtime metadata for the source type
  # @api private
  def 
    SchemaArtifacts::RuntimeMetadata::UpdateTarget.new(
      type: destination_type_ref.name,
      relationship: nil,
      script_id: painless_script.id,
      id_source: id_source,
      routing_value_source: routing_value_source,
      rollover_timestamp_value_source: rollover_timestamp_value_source,
      metadata_params: {},
      data_params: fields.map(&:source_field).to_h do |f|
        [f, SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: f, cardinality: :many)]
      end
    )
  end

  private

  def generate_script
    if fields.empty?
      raise Errors::SchemaError, "`derive_indexed_type_fields` definition for #{destination_type_ref} (from #{source_type.name}) " \
        "has no derived field definitions."
    end

    sorted_fields = fields.sort_by(&:destination_field)

    # We use `uniq` here to avoid re-doing the same setup multiple times, since multiple fields can sometimes
    # need the same setup (such as initializing a common parent field to an empty map).
    function_defs = sorted_fields.flat_map(&:function_definitions).uniq.map(&:strip).sort

    setup_statements = [STATIC_SETUP_STATEMENTS] + sorted_fields.flat_map(&:setup_statements).uniq.map(&:strip)

    apply_update_statements = sorted_fields.map { |f| apply_update_statement(f).strip }

    # Note: comments in the script are effectively "free" since:
    #
    #   - The compiler will strip them out.
    #   - We only send the script to the datastore once (when configuring the cluster), and later
    #     reference it only by id--so we don't pay for the larger payload on each indexing request.
    <<~EOS
      #{function_defs.join("\n\n")}

      #{setup_statements.join("\n")}

      #{apply_update_statements.join("\n")}

      if (!#{SCRIPT_ERRORS_VAR}.isEmpty()) {
        throw new IllegalArgumentException("#{DERIVED_INDEX_FAILURE_MESSAGE_PREAMBLE}: " + #{SCRIPT_ERRORS_VAR}.join(" "));
      }

      // For records with no new values to index, only skip the update if the document itself doesn't already exist.
      // Otherwise create an (empty) document to reflect the fact that the id has been seen.
      if (ctx._source.id != null && #{sorted_fields.map { |f| was_noop_variable(f) }.join(" && ")}) {
        ctx.op = 'none';
      } else {
        // Here we set `_source.id` because if we don't, it'll never be set, making these docs subtly
        // different from docs indexed the normal way.
        //
        // Note also that we MUST use `params.id` instead of `ctx._id`. The latter works on an update
        // of an existing document, but is unavailable when we are inserting the document for the first time.
        ctx._source.id = params.id;
      }
    EOS
  end

  def apply_update_statement(field)
    "boolean #{was_noop_variable(field)} = !#{field.apply_operation_returning_update_status};"
  end

  def was_noop_variable(field)
    "#{field.destination_field.gsub(".", "__")}_was_noop"
  end

  SCRIPT_ERRORS_VAR = "scriptErrors"

  STATIC_SETUP_STATEMENTS = <<~EOS.strip
    Map data = params.data;
    // A variable to accumulate script errors so that we can surface _all_ issues and not just the first.
    List #{SCRIPT_ERRORS_VAR} = new ArrayList();
  EOS
end

#routing_value_sourceString?

Returns path to field on the source type used for shard routing.

Returns:

  • (String, nil)

    path to field on the source type used for shard routing



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 67

class DerivedIndexedType < ::Struct.new(
  :source_type,
  :destination_type_ref,
  :id_source,
  :routing_value_source,
  :rollover_timestamp_value_source,
  :fields
)
  # @param source_type [SchemaElements::ObjectType] the type used as a source for this derive type
  # @param destination_type_ref [SchemaElements::TypeReference] the derived type
  # @param id_source [String] path to field on the source type used as `id` on the derived type
  # @param routing_value_source [String, nil] path to field on the source type used for shard routing
  # @param rollover_timestamp_value_source [String, nil] path to field on the source type used as the timestamp field for rollover
  # @yield [DerivedIndexedType] the `DerivedIndexedType` instance
  # @api private
  def initialize(
    source_type:,
    destination_type_ref:,
    id_source:,
    routing_value_source:,
    rollover_timestamp_value_source:
  )
    fields = [] # : ::Array[_DerivedField]
    super(
      source_type: source_type,
      destination_type_ref: destination_type_ref,
      id_source: id_source,
      routing_value_source: routing_value_source,
      rollover_timestamp_value_source: rollover_timestamp_value_source,
      fields: fields
    )
    yield self
  end

  # Configures `field_name` (on the derived indexing type) to contain the set union of all values from
  # the `from` field on the source type. Values are only ever appended to the set, so the field will
  # act as an append-only set.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived set
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::AppendOnlySet]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "studentName", "String"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.append_only_set "students", from: "studentName"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "students", "[String!]!"
  #
  #       t.index "courses"
  #     end
  #   end
  def append_only_set(field_name, from:)
    fields << DerivedFields::AppendOnlySet.new(field_name, from)
  end

  # Configures `field_name` (on the derived indexing type) to contain a single immutable value from the
  # `from` field on the source type. Immutability is enforced by triggering an indexing failure with a
  # clear error if any event's source value is different from the value already indexed on this field.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @param nullable [Boolean] whether the field is allowed to be set to `null`. When set to false, events
  #   that contain a `null` value in the `from` field will be rejected instead of setting the field’s value
  #   to `null`.
  # @param can_change_from_null [Boolean] whether a one-time mutation of the field value is allowed from
  #   `null` to a non-`null` value. This can be useful when dealing with a field that may not have a value
  #   on all source events. For example, if the source field was not initially part of the schema of your
  #   source dataset, you may have old records that lack a value for this field. When set, this option
  #   allows a one-time mutation of the field value from `null` to a non-`null` value. Once set to a
  #   non-`null` value, any additional `null` values that are encountered will be ignored (ensuring that
  #   the indexed data converges on the same state regardless of the order the events are ingested in).
  #   Note: this option cannot be enabled when `nullable: false` has been set.
  # @return [DerivedFields::ImmutableValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseName", "String"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.immutable_value "name", from: "courseName"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "name", "String"
  #
  #       t.index "courses"
  #     end
  #   end
  def immutable_value(field_name, from:, nullable: true, can_change_from_null: false)
    if !nullable && can_change_from_null
      raise Errors::SchemaError, "`can_change_from_null: true` is not allowed with `nullable: false` (as there would be no `null` values to change from)."
    end

    fields << DerivedFields::ImmutableValue.new(
      destination_field: field_name,
      source_field: from,
      nullable: nullable,
      can_change_from_null: can_change_from_null
    )
  end

  # Configures `field_name` (on the derived indexing type) to contain the minimum of all values from the `from`
  # field on the source type.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::MinOrMaxValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseStartDate", "Date"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.min_value "firstOfferedDate", from: "courseStartDate"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "firstOfferedDate", "Date"
  #
  #       t.index "courses"
  #     end
  #   end
  def min_value(field_name, from:)
    fields << DerivedFields::MinOrMaxValue.new(field_name, from, :min)
  end

  # Configures `field_name` (on the derived indexing type) to contain the maximum of all values from the `from`
  # field on the source type.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::MinOrMaxValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseStartDate", "Date"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.max_value "mostRecentlyOfferedDate", from: "courseStartDate"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "mostRecentlyOfferedDate", "Date"
  #
  #       t.index "courses"
  #     end
  #   end
  def max_value(field_name, from:)
    fields << DerivedFields::MinOrMaxValue.new(field_name, from, :max)
  end

  # @return [Scripting::Script] Painless script that will maintain the derived fields
  # @api private
  def painless_script
    Scripting::Script.new(
      source: generate_script.strip,
      name: "#{destination_type_ref}_from_#{source_type.name}",
      language: "painless",
      context: "update"
    )
  end

  # @return [SchemaArtifacts::RuntimeMetadata::UpdateTarget] runtime metadata for the source type
  # @api private
  def 
    SchemaArtifacts::RuntimeMetadata::UpdateTarget.new(
      type: destination_type_ref.name,
      relationship: nil,
      script_id: painless_script.id,
      id_source: id_source,
      routing_value_source: routing_value_source,
      rollover_timestamp_value_source: rollover_timestamp_value_source,
      metadata_params: {},
      data_params: fields.map(&:source_field).to_h do |f|
        [f, SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: f, cardinality: :many)]
      end
    )
  end

  private

  def generate_script
    if fields.empty?
      raise Errors::SchemaError, "`derive_indexed_type_fields` definition for #{destination_type_ref} (from #{source_type.name}) " \
        "has no derived field definitions."
    end

    sorted_fields = fields.sort_by(&:destination_field)

    # We use `uniq` here to avoid re-doing the same setup multiple times, since multiple fields can sometimes
    # need the same setup (such as initializing a common parent field to an empty map).
    function_defs = sorted_fields.flat_map(&:function_definitions).uniq.map(&:strip).sort

    setup_statements = [STATIC_SETUP_STATEMENTS] + sorted_fields.flat_map(&:setup_statements).uniq.map(&:strip)

    apply_update_statements = sorted_fields.map { |f| apply_update_statement(f).strip }

    # Note: comments in the script are effectively "free" since:
    #
    #   - The compiler will strip them out.
    #   - We only send the script to the datastore once (when configuring the cluster), and later
    #     reference it only by id--so we don't pay for the larger payload on each indexing request.
    <<~EOS
      #{function_defs.join("\n\n")}

      #{setup_statements.join("\n")}

      #{apply_update_statements.join("\n")}

      if (!#{SCRIPT_ERRORS_VAR}.isEmpty()) {
        throw new IllegalArgumentException("#{DERIVED_INDEX_FAILURE_MESSAGE_PREAMBLE}: " + #{SCRIPT_ERRORS_VAR}.join(" "));
      }

      // For records with no new values to index, only skip the update if the document itself doesn't already exist.
      // Otherwise create an (empty) document to reflect the fact that the id has been seen.
      if (ctx._source.id != null && #{sorted_fields.map { |f| was_noop_variable(f) }.join(" && ")}) {
        ctx.op = 'none';
      } else {
        // Here we set `_source.id` because if we don't, it'll never be set, making these docs subtly
        // different from docs indexed the normal way.
        //
        // Note also that we MUST use `params.id` instead of `ctx._id`. The latter works on an update
        // of an existing document, but is unavailable when we are inserting the document for the first time.
        ctx._source.id = params.id;
      }
    EOS
  end

  def apply_update_statement(field)
    "boolean #{was_noop_variable(field)} = !#{field.apply_operation_returning_update_status};"
  end

  def was_noop_variable(field)
    "#{field.destination_field.gsub(".", "__")}_was_noop"
  end

  SCRIPT_ERRORS_VAR = "scriptErrors"

  STATIC_SETUP_STATEMENTS = <<~EOS.strip
    Map data = params.data;
    // A variable to accumulate script errors so that we can surface _all_ issues and not just the first.
    List #{SCRIPT_ERRORS_VAR} = new ArrayList();
  EOS
end

#source_typeSchemaElements::ObjectType

Returns the type used as a source for this derive type.

Returns:



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 67

class DerivedIndexedType < ::Struct.new(
  :source_type,
  :destination_type_ref,
  :id_source,
  :routing_value_source,
  :rollover_timestamp_value_source,
  :fields
)
  # @param source_type [SchemaElements::ObjectType] the type used as a source for this derive type
  # @param destination_type_ref [SchemaElements::TypeReference] the derived type
  # @param id_source [String] path to field on the source type used as `id` on the derived type
  # @param routing_value_source [String, nil] path to field on the source type used for shard routing
  # @param rollover_timestamp_value_source [String, nil] path to field on the source type used as the timestamp field for rollover
  # @yield [DerivedIndexedType] the `DerivedIndexedType` instance
  # @api private
  def initialize(
    source_type:,
    destination_type_ref:,
    id_source:,
    routing_value_source:,
    rollover_timestamp_value_source:
  )
    fields = [] # : ::Array[_DerivedField]
    super(
      source_type: source_type,
      destination_type_ref: destination_type_ref,
      id_source: id_source,
      routing_value_source: routing_value_source,
      rollover_timestamp_value_source: rollover_timestamp_value_source,
      fields: fields
    )
    yield self
  end

  # Configures `field_name` (on the derived indexing type) to contain the set union of all values from
  # the `from` field on the source type. Values are only ever appended to the set, so the field will
  # act as an append-only set.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived set
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::AppendOnlySet]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "studentName", "String"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.append_only_set "students", from: "studentName"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "students", "[String!]!"
  #
  #       t.index "courses"
  #     end
  #   end
  def append_only_set(field_name, from:)
    fields << DerivedFields::AppendOnlySet.new(field_name, from)
  end

  # Configures `field_name` (on the derived indexing type) to contain a single immutable value from the
  # `from` field on the source type. Immutability is enforced by triggering an indexing failure with a
  # clear error if any event's source value is different from the value already indexed on this field.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @param nullable [Boolean] whether the field is allowed to be set to `null`. When set to false, events
  #   that contain a `null` value in the `from` field will be rejected instead of setting the field’s value
  #   to `null`.
  # @param can_change_from_null [Boolean] whether a one-time mutation of the field value is allowed from
  #   `null` to a non-`null` value. This can be useful when dealing with a field that may not have a value
  #   on all source events. For example, if the source field was not initially part of the schema of your
  #   source dataset, you may have old records that lack a value for this field. When set, this option
  #   allows a one-time mutation of the field value from `null` to a non-`null` value. Once set to a
  #   non-`null` value, any additional `null` values that are encountered will be ignored (ensuring that
  #   the indexed data converges on the same state regardless of the order the events are ingested in).
  #   Note: this option cannot be enabled when `nullable: false` has been set.
  # @return [DerivedFields::ImmutableValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseName", "String"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.immutable_value "name", from: "courseName"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "name", "String"
  #
  #       t.index "courses"
  #     end
  #   end
  def immutable_value(field_name, from:, nullable: true, can_change_from_null: false)
    if !nullable && can_change_from_null
      raise Errors::SchemaError, "`can_change_from_null: true` is not allowed with `nullable: false` (as there would be no `null` values to change from)."
    end

    fields << DerivedFields::ImmutableValue.new(
      destination_field: field_name,
      source_field: from,
      nullable: nullable,
      can_change_from_null: can_change_from_null
    )
  end

  # Configures `field_name` (on the derived indexing type) to contain the minimum of all values from the `from`
  # field on the source type.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::MinOrMaxValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseStartDate", "Date"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.min_value "firstOfferedDate", from: "courseStartDate"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "firstOfferedDate", "Date"
  #
  #       t.index "courses"
  #     end
  #   end
  def min_value(field_name, from:)
    fields << DerivedFields::MinOrMaxValue.new(field_name, from, :min)
  end

  # Configures `field_name` (on the derived indexing type) to contain the maximum of all values from the `from`
  # field on the source type.
  #
  # @param field_name [String] name of field on the derived indexing type to store the derived value
  # @param from [String] path to field on the source type to source values from
  # @return [DerivedIndexedType::MinOrMaxValue]
  #
  # @example
  #   ElasticGraph.define_schema do |schema|
  #     schema.object_type "StudentCourseEnrollment" do |t|
  #       t.field "id", "ID"
  #       t.field "courseId", "ID"
  #       t.field "courseStartDate", "Date"
  #
  #       t.index "student_course_enrollments"
  #
  #       t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
  #         derive.max_value "mostRecentlyOfferedDate", from: "courseStartDate"
  #       end
  #     end
  #
  #     schema.object_type "Course" do |t|
  #       t.field "id", "ID"
  #       t.field "mostRecentlyOfferedDate", "Date"
  #
  #       t.index "courses"
  #     end
  #   end
  def max_value(field_name, from:)
    fields << DerivedFields::MinOrMaxValue.new(field_name, from, :max)
  end

  # @return [Scripting::Script] Painless script that will maintain the derived fields
  # @api private
  def painless_script
    Scripting::Script.new(
      source: generate_script.strip,
      name: "#{destination_type_ref}_from_#{source_type.name}",
      language: "painless",
      context: "update"
    )
  end

  # @return [SchemaArtifacts::RuntimeMetadata::UpdateTarget] runtime metadata for the source type
  # @api private
  def 
    SchemaArtifacts::RuntimeMetadata::UpdateTarget.new(
      type: destination_type_ref.name,
      relationship: nil,
      script_id: painless_script.id,
      id_source: id_source,
      routing_value_source: routing_value_source,
      rollover_timestamp_value_source: rollover_timestamp_value_source,
      metadata_params: {},
      data_params: fields.map(&:source_field).to_h do |f|
        [f, SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: f, cardinality: :many)]
      end
    )
  end

  private

  def generate_script
    if fields.empty?
      raise Errors::SchemaError, "`derive_indexed_type_fields` definition for #{destination_type_ref} (from #{source_type.name}) " \
        "has no derived field definitions."
    end

    sorted_fields = fields.sort_by(&:destination_field)

    # We use `uniq` here to avoid re-doing the same setup multiple times, since multiple fields can sometimes
    # need the same setup (such as initializing a common parent field to an empty map).
    function_defs = sorted_fields.flat_map(&:function_definitions).uniq.map(&:strip).sort

    setup_statements = [STATIC_SETUP_STATEMENTS] + sorted_fields.flat_map(&:setup_statements).uniq.map(&:strip)

    apply_update_statements = sorted_fields.map { |f| apply_update_statement(f).strip }

    # Note: comments in the script are effectively "free" since:
    #
    #   - The compiler will strip them out.
    #   - We only send the script to the datastore once (when configuring the cluster), and later
    #     reference it only by id--so we don't pay for the larger payload on each indexing request.
    <<~EOS
      #{function_defs.join("\n\n")}

      #{setup_statements.join("\n")}

      #{apply_update_statements.join("\n")}

      if (!#{SCRIPT_ERRORS_VAR}.isEmpty()) {
        throw new IllegalArgumentException("#{DERIVED_INDEX_FAILURE_MESSAGE_PREAMBLE}: " + #{SCRIPT_ERRORS_VAR}.join(" "));
      }

      // For records with no new values to index, only skip the update if the document itself doesn't already exist.
      // Otherwise create an (empty) document to reflect the fact that the id has been seen.
      if (ctx._source.id != null && #{sorted_fields.map { |f| was_noop_variable(f) }.join(" && ")}) {
        ctx.op = 'none';
      } else {
        // Here we set `_source.id` because if we don't, it'll never be set, making these docs subtly
        // different from docs indexed the normal way.
        //
        // Note also that we MUST use `params.id` instead of `ctx._id`. The latter works on an update
        // of an existing document, but is unavailable when we are inserting the document for the first time.
        ctx._source.id = params.id;
      }
    EOS
  end

  def apply_update_statement(field)
    "boolean #{was_noop_variable(field)} = !#{field.apply_operation_returning_update_status};"
  end

  def was_noop_variable(field)
    "#{field.destination_field.gsub(".", "__")}_was_noop"
  end

  SCRIPT_ERRORS_VAR = "scriptErrors"

  STATIC_SETUP_STATEMENTS = <<~EOS.strip
    Map data = params.data;
    // A variable to accumulate script errors so that we can surface _all_ issues and not just the first.
    List #{SCRIPT_ERRORS_VAR} = new ArrayList();
  EOS
end

Instance Method Details

#append_only_set(field_name, from:) ⇒ DerivedIndexedType::AppendOnlySet

Configures field_name (on the derived indexing type) to contain the set union of all values from the from field on the source type. Values are only ever appended to the set, so the field will act as an append-only set.

Examples:

ElasticGraph.define_schema do |schema|
  schema.object_type "StudentCourseEnrollment" do |t|
    t.field "id", "ID"
    t.field "courseId", "ID"
    t.field "studentName", "String"

    t.index "student_course_enrollments"

    t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
      derive.append_only_set "students", from: "studentName"
    end
  end

  schema.object_type "Course" do |t|
    t.field "id", "ID"
    t.field "students", "[String!]!"

    t.index "courses"
  end
end

Parameters:

  • field_name (String)

    name of field on the derived indexing type to store the derived set

  • from (String)

    path to field on the source type to source values from

Returns:

  • (DerivedIndexedType::AppendOnlySet)


130
131
132
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 130

def append_only_set(field_name, from:)
  fields << DerivedFields::AppendOnlySet.new(field_name, from)
end

#immutable_value(field_name, from:, nullable: true, can_change_from_null: false) ⇒ DerivedFields::ImmutableValue

Configures field_name (on the derived indexing type) to contain a single immutable value from the from field on the source type. Immutability is enforced by triggering an indexing failure with a clear error if any event’s source value is different from the value already indexed on this field.

Examples:

ElasticGraph.define_schema do |schema|
  schema.object_type "StudentCourseEnrollment" do |t|
    t.field "id", "ID"
    t.field "courseId", "ID"
    t.field "courseName", "String"

    t.index "student_course_enrollments"

    t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
      derive.immutable_value "name", from: "courseName"
    end
  end

  schema.object_type "Course" do |t|
    t.field "id", "ID"
    t.field "name", "String"

    t.index "courses"
  end
end

Parameters:

  • field_name (String)

    name of field on the derived indexing type to store the derived value

  • from (String)

    path to field on the source type to source values from

  • nullable (Boolean) (defaults to: true)

    whether the field is allowed to be set to null. When set to false, events that contain a null value in the from field will be rejected instead of setting the field’s value to null.

  • can_change_from_null (Boolean) (defaults to: false)

    whether a one-time mutation of the field value is allowed from null to a non-null value. This can be useful when dealing with a field that may not have a value on all source events. For example, if the source field was not initially part of the schema of your source dataset, you may have old records that lack a value for this field. When set, this option allows a one-time mutation of the field value from null to a non-null value. Once set to a non-null value, any additional null values that are encountered will be ignored (ensuring that the indexed data converges on the same state regardless of the order the events are ingested in). Note: this option cannot be enabled when nullable: false has been set.

Returns:



174
175
176
177
178
179
180
181
182
183
184
185
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 174

def immutable_value(field_name, from:, nullable: true, can_change_from_null: false)
  if !nullable && can_change_from_null
    raise Errors::SchemaError, "`can_change_from_null: true` is not allowed with `nullable: false` (as there would be no `null` values to change from)."
  end

  fields << DerivedFields::ImmutableValue.new(
    destination_field: field_name,
    source_field: from,
    nullable: nullable,
    can_change_from_null: can_change_from_null
  )
end

#max_value(field_name, from:) ⇒ DerivedIndexedType::MinOrMaxValue

Configures field_name (on the derived indexing type) to contain the maximum of all values from the from field on the source type.

Examples:

ElasticGraph.define_schema do |schema|
  schema.object_type "StudentCourseEnrollment" do |t|
    t.field "id", "ID"
    t.field "courseId", "ID"
    t.field "courseStartDate", "Date"

    t.index "student_course_enrollments"

    t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
      derive.max_value "mostRecentlyOfferedDate", from: "courseStartDate"
    end
  end

  schema.object_type "Course" do |t|
    t.field "id", "ID"
    t.field "mostRecentlyOfferedDate", "Date"

    t.index "courses"
  end
end

Parameters:

  • field_name (String)

    name of field on the derived indexing type to store the derived value

  • from (String)

    path to field on the source type to source values from

Returns:

  • (DerivedIndexedType::MinOrMaxValue)


247
248
249
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 247

def max_value(field_name, from:)
  fields << DerivedFields::MinOrMaxValue.new(field_name, from, :max)
end

#min_value(field_name, from:) ⇒ DerivedIndexedType::MinOrMaxValue

Configures field_name (on the derived indexing type) to contain the minimum of all values from the from field on the source type.

Examples:

ElasticGraph.define_schema do |schema|
  schema.object_type "StudentCourseEnrollment" do |t|
    t.field "id", "ID"
    t.field "courseId", "ID"
    t.field "courseStartDate", "Date"

    t.index "student_course_enrollments"

    t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive|
      derive.min_value "firstOfferedDate", from: "courseStartDate"
    end
  end

  schema.object_type "Course" do |t|
    t.field "id", "ID"
    t.field "firstOfferedDate", "Date"

    t.index "courses"
  end
end

Parameters:

  • field_name (String)

    name of field on the derived indexing type to store the derived value

  • from (String)

    path to field on the source type to source values from

Returns:

  • (DerivedIndexedType::MinOrMaxValue)


215
216
217
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 215

def min_value(field_name, from:)
  fields << DerivedFields::MinOrMaxValue.new(field_name, from, :min)
end

#painless_scriptScripting::Script

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Painless script that will maintain the derived fields.

Returns:

  • (Scripting::Script)

    Painless script that will maintain the derived fields



253
254
255
256
257
258
259
260
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 253

def painless_script
  Scripting::Script.new(
    source: generate_script.strip,
    name: "#{destination_type_ref}_from_#{source_type.name}",
    language: "painless",
    context: "update"
  )
end

#runtime_metadata_for_source_typeSchemaArtifacts::RuntimeMetadata::UpdateTarget

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns runtime metadata for the source type.

Returns:

  • (SchemaArtifacts::RuntimeMetadata::UpdateTarget)

    runtime metadata for the source type



264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 264

def 
  SchemaArtifacts::RuntimeMetadata::UpdateTarget.new(
    type: destination_type_ref.name,
    relationship: nil,
    script_id: painless_script.id,
    id_source: id_source,
    routing_value_source: routing_value_source,
    rollover_timestamp_value_source: rollover_timestamp_value_source,
    metadata_params: {},
    data_params: fields.map(&:source_field).to_h do |f|
      [f, SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: f, cardinality: :many)]
    end
  )
end