Class: ElasticGraph::SchemaDefinition::Indexing::DerivedIndexedType
- Inherits:
-
Object
- Object
- ElasticGraph::SchemaDefinition::Indexing::DerivedIndexedType
- Defined in:
- elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb
Overview
Used to configure the derivation of a derived indexed type from a source type. This type is yielded from Mixins::HasIndices#derive_indexed_type_fields.
Instance Attribute Summary collapse
-
#fields ⇒ Array<DerivedFields::AppendOnlySet, DerivedFields::ImmutableValue, DerivedFields::MinOrMaxValue>
Derived field definitions.
-
#id_source ⇒ String
Path to field on the source type used as
id
on the derived type. -
#rollover_timestamp_value_source ⇒ String?
Path to field on the source type used as the timestamp field for rollover.
-
#routing_value_source ⇒ String?
Path to field on the source type used for shard routing.
-
#source_type ⇒ SchemaElements::ObjectType
The type used as a source for this derive type.
Instance Method Summary collapse
-
#append_only_set(field_name, from:) ⇒ DerivedIndexedType::AppendOnlySet
Configures
field_name
(on the derived indexing type) to contain the set union of all values from thefrom
field on the source type. -
#immutable_value(field_name, from:, nullable: true, can_change_from_null: false) ⇒ DerivedFields::ImmutableValue
Configures
field_name
(on the derived indexing type) to contain a single immutable value from thefrom
field on the source type. -
#initialize(source_type:, destination_type_ref:, id_source:, routing_value_source:, rollover_timestamp_value_source:) {|DerivedIndexedType| ... } ⇒ DerivedIndexedType
constructor
private
A new instance of DerivedIndexedType.
-
#max_value(field_name, from:) ⇒ DerivedIndexedType::MinOrMaxValue
Configures
field_name
(on the derived indexing type) to contain the maximum of all values from thefrom
field on the source type. -
#min_value(field_name, from:) ⇒ DerivedIndexedType::MinOrMaxValue
Configures
field_name
(on the derived indexing type) to contain the minimum of all values from thefrom
field on the source type. -
#painless_script ⇒ Scripting::Script
private
Painless script that will maintain the derived fields.
-
#runtime_metadata_for_source_type ⇒ SchemaArtifacts::RuntimeMetadata::UpdateTarget
private
Runtime metadata for the source type.
Constructor Details
#initialize(source_type:, destination_type_ref:, id_source:, routing_value_source:, rollover_timestamp_value_source:) {|DerivedIndexedType| ... } ⇒ DerivedIndexedType
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of DerivedIndexedType.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 82 def initialize( source_type:, destination_type_ref:, id_source:, routing_value_source:, rollover_timestamp_value_source: ) fields = [] # : ::Array[_DerivedField] super( source_type: source_type, destination_type_ref: destination_type_ref, id_source: id_source, routing_value_source: routing_value_source, rollover_timestamp_value_source: , fields: fields ) yield self end |
Instance Attribute Details
#fields ⇒ Array<DerivedFields::AppendOnlySet, DerivedFields::ImmutableValue, DerivedFields::MinOrMaxValue>
Returns derived field definitions.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 67 class DerivedIndexedType < ::Struct.new( :source_type, :destination_type_ref, :id_source, :routing_value_source, :rollover_timestamp_value_source, :fields ) # @param source_type [SchemaElements::ObjectType] the type used as a source for this derive type # @param destination_type_ref [SchemaElements::TypeReference] the derived type # @param id_source [String] path to field on the source type used as `id` on the derived type # @param routing_value_source [String, nil] path to field on the source type used for shard routing # @param rollover_timestamp_value_source [String, nil] path to field on the source type used as the timestamp field for rollover # @yield [DerivedIndexedType] the `DerivedIndexedType` instance # @api private def initialize( source_type:, destination_type_ref:, id_source:, routing_value_source:, rollover_timestamp_value_source: ) fields = [] # : ::Array[_DerivedField] super( source_type: source_type, destination_type_ref: destination_type_ref, id_source: id_source, routing_value_source: routing_value_source, rollover_timestamp_value_source: , fields: fields ) yield self end # Configures `field_name` (on the derived indexing type) to contain the set union of all values from # the `from` field on the source type. Values are only ever appended to the set, so the field will # act as an append-only set. # # @param field_name [String] name of field on the derived indexing type to store the derived set # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::AppendOnlySet] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "studentName", "String" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.append_only_set "students", from: "studentName" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "students", "[String!]!" # # t.index "courses" # end # end def append_only_set(field_name, from:) fields << DerivedFields::AppendOnlySet.new(field_name, from) end # Configures `field_name` (on the derived indexing type) to contain a single immutable value from the # `from` field on the source type. Immutability is enforced by triggering an indexing failure with a # clear error if any event's source value is different from the value already indexed on this field. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @param nullable [Boolean] whether the field is allowed to be set to `null`. When set to false, events # that contain a `null` value in the `from` field will be rejected instead of setting the field’s value # to `null`. # @param can_change_from_null [Boolean] whether a one-time mutation of the field value is allowed from # `null` to a non-`null` value. This can be useful when dealing with a field that may not have a value # on all source events. For example, if the source field was not initially part of the schema of your # source dataset, you may have old records that lack a value for this field. When set, this option # allows a one-time mutation of the field value from `null` to a non-`null` value. Once set to a # non-`null` value, any additional `null` values that are encountered will be ignored (ensuring that # the indexed data converges on the same state regardless of the order the events are ingested in). # Note: this option cannot be enabled when `nullable: false` has been set. # @return [DerivedFields::ImmutableValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseName", "String" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.immutable_value "name", from: "courseName" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "name", "String" # # t.index "courses" # end # end def immutable_value(field_name, from:, nullable: true, can_change_from_null: false) if !nullable && can_change_from_null raise Errors::SchemaError, "`can_change_from_null: true` is not allowed with `nullable: false` (as there would be no `null` values to change from)." end fields << DerivedFields::ImmutableValue.new( destination_field: field_name, source_field: from, nullable: nullable, can_change_from_null: can_change_from_null ) end # Configures `field_name` (on the derived indexing type) to contain the minimum of all values from the `from` # field on the source type. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::MinOrMaxValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseStartDate", "Date" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.min_value "firstOfferedDate", from: "courseStartDate" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "firstOfferedDate", "Date" # # t.index "courses" # end # end def min_value(field_name, from:) fields << DerivedFields::MinOrMaxValue.new(field_name, from, :min) end # Configures `field_name` (on the derived indexing type) to contain the maximum of all values from the `from` # field on the source type. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::MinOrMaxValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseStartDate", "Date" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.max_value "mostRecentlyOfferedDate", from: "courseStartDate" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "mostRecentlyOfferedDate", "Date" # # t.index "courses" # end # end def max_value(field_name, from:) fields << DerivedFields::MinOrMaxValue.new(field_name, from, :max) end # @return [Scripting::Script] Painless script that will maintain the derived fields # @api private def painless_script Scripting::Script.new( source: generate_script.strip, name: "#{destination_type_ref}_from_#{source_type.name}", language: "painless", context: "update" ) end # @return [SchemaArtifacts::RuntimeMetadata::UpdateTarget] runtime metadata for the source type # @api private def SchemaArtifacts::RuntimeMetadata::UpdateTarget.new( type: destination_type_ref.name, relationship: nil, script_id: painless_script.id, id_source: id_source, routing_value_source: routing_value_source, rollover_timestamp_value_source: , metadata_params: {}, data_params: fields.map(&:source_field).to_h do |f| [f, SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: f, cardinality: :many)] end ) end private def generate_script if fields.empty? raise Errors::SchemaError, "`derive_indexed_type_fields` definition for #{destination_type_ref} (from #{source_type.name}) " \ "has no derived field definitions." end sorted_fields = fields.sort_by(&:destination_field) # We use `uniq` here to avoid re-doing the same setup multiple times, since multiple fields can sometimes # need the same setup (such as initializing a common parent field to an empty map). function_defs = sorted_fields.flat_map(&:function_definitions).uniq.map(&:strip).sort setup_statements = [STATIC_SETUP_STATEMENTS] + sorted_fields.flat_map(&:setup_statements).uniq.map(&:strip) apply_update_statements = sorted_fields.map { |f| apply_update_statement(f).strip } # Note: comments in the script are effectively "free" since: # # - The compiler will strip them out. # - We only send the script to the datastore once (when configuring the cluster), and later # reference it only by id--so we don't pay for the larger payload on each indexing request. <<~EOS #{function_defs.join("\n\n")} #{setup_statements.join("\n")} #{apply_update_statements.join("\n")} if (!#{SCRIPT_ERRORS_VAR}.isEmpty()) { throw new IllegalArgumentException("#{DERIVED_INDEX_FAILURE_MESSAGE_PREAMBLE}: " + #{SCRIPT_ERRORS_VAR}.join(" ")); } // For records with no new values to index, only skip the update if the document itself doesn't already exist. // Otherwise create an (empty) document to reflect the fact that the id has been seen. if (ctx._source.id != null && #{sorted_fields.map { |f| was_noop_variable(f) }.join(" && ")}) { ctx.op = 'none'; } else { // Here we set `_source.id` because if we don't, it'll never be set, making these docs subtly // different from docs indexed the normal way. // // Note also that we MUST use `params.id` instead of `ctx._id`. The latter works on an update // of an existing document, but is unavailable when we are inserting the document for the first time. ctx._source.id = params.id; } EOS end def apply_update_statement(field) "boolean #{was_noop_variable(field)} = !#{field.apply_operation_returning_update_status};" end def was_noop_variable(field) "#{field.destination_field.gsub(".", "__")}_was_noop" end SCRIPT_ERRORS_VAR = "scriptErrors" STATIC_SETUP_STATEMENTS = <<~EOS.strip Map data = params.data; // A variable to accumulate script errors so that we can surface _all_ issues and not just the first. List #{SCRIPT_ERRORS_VAR} = new ArrayList(); EOS end |
#id_source ⇒ String
Returns path to field on the source type used as id
on the derived type.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 67 class DerivedIndexedType < ::Struct.new( :source_type, :destination_type_ref, :id_source, :routing_value_source, :rollover_timestamp_value_source, :fields ) # @param source_type [SchemaElements::ObjectType] the type used as a source for this derive type # @param destination_type_ref [SchemaElements::TypeReference] the derived type # @param id_source [String] path to field on the source type used as `id` on the derived type # @param routing_value_source [String, nil] path to field on the source type used for shard routing # @param rollover_timestamp_value_source [String, nil] path to field on the source type used as the timestamp field for rollover # @yield [DerivedIndexedType] the `DerivedIndexedType` instance # @api private def initialize( source_type:, destination_type_ref:, id_source:, routing_value_source:, rollover_timestamp_value_source: ) fields = [] # : ::Array[_DerivedField] super( source_type: source_type, destination_type_ref: destination_type_ref, id_source: id_source, routing_value_source: routing_value_source, rollover_timestamp_value_source: , fields: fields ) yield self end # Configures `field_name` (on the derived indexing type) to contain the set union of all values from # the `from` field on the source type. Values are only ever appended to the set, so the field will # act as an append-only set. # # @param field_name [String] name of field on the derived indexing type to store the derived set # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::AppendOnlySet] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "studentName", "String" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.append_only_set "students", from: "studentName" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "students", "[String!]!" # # t.index "courses" # end # end def append_only_set(field_name, from:) fields << DerivedFields::AppendOnlySet.new(field_name, from) end # Configures `field_name` (on the derived indexing type) to contain a single immutable value from the # `from` field on the source type. Immutability is enforced by triggering an indexing failure with a # clear error if any event's source value is different from the value already indexed on this field. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @param nullable [Boolean] whether the field is allowed to be set to `null`. When set to false, events # that contain a `null` value in the `from` field will be rejected instead of setting the field’s value # to `null`. # @param can_change_from_null [Boolean] whether a one-time mutation of the field value is allowed from # `null` to a non-`null` value. This can be useful when dealing with a field that may not have a value # on all source events. For example, if the source field was not initially part of the schema of your # source dataset, you may have old records that lack a value for this field. When set, this option # allows a one-time mutation of the field value from `null` to a non-`null` value. Once set to a # non-`null` value, any additional `null` values that are encountered will be ignored (ensuring that # the indexed data converges on the same state regardless of the order the events are ingested in). # Note: this option cannot be enabled when `nullable: false` has been set. # @return [DerivedFields::ImmutableValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseName", "String" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.immutable_value "name", from: "courseName" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "name", "String" # # t.index "courses" # end # end def immutable_value(field_name, from:, nullable: true, can_change_from_null: false) if !nullable && can_change_from_null raise Errors::SchemaError, "`can_change_from_null: true` is not allowed with `nullable: false` (as there would be no `null` values to change from)." end fields << DerivedFields::ImmutableValue.new( destination_field: field_name, source_field: from, nullable: nullable, can_change_from_null: can_change_from_null ) end # Configures `field_name` (on the derived indexing type) to contain the minimum of all values from the `from` # field on the source type. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::MinOrMaxValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseStartDate", "Date" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.min_value "firstOfferedDate", from: "courseStartDate" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "firstOfferedDate", "Date" # # t.index "courses" # end # end def min_value(field_name, from:) fields << DerivedFields::MinOrMaxValue.new(field_name, from, :min) end # Configures `field_name` (on the derived indexing type) to contain the maximum of all values from the `from` # field on the source type. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::MinOrMaxValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseStartDate", "Date" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.max_value "mostRecentlyOfferedDate", from: "courseStartDate" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "mostRecentlyOfferedDate", "Date" # # t.index "courses" # end # end def max_value(field_name, from:) fields << DerivedFields::MinOrMaxValue.new(field_name, from, :max) end # @return [Scripting::Script] Painless script that will maintain the derived fields # @api private def painless_script Scripting::Script.new( source: generate_script.strip, name: "#{destination_type_ref}_from_#{source_type.name}", language: "painless", context: "update" ) end # @return [SchemaArtifacts::RuntimeMetadata::UpdateTarget] runtime metadata for the source type # @api private def SchemaArtifacts::RuntimeMetadata::UpdateTarget.new( type: destination_type_ref.name, relationship: nil, script_id: painless_script.id, id_source: id_source, routing_value_source: routing_value_source, rollover_timestamp_value_source: , metadata_params: {}, data_params: fields.map(&:source_field).to_h do |f| [f, SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: f, cardinality: :many)] end ) end private def generate_script if fields.empty? raise Errors::SchemaError, "`derive_indexed_type_fields` definition for #{destination_type_ref} (from #{source_type.name}) " \ "has no derived field definitions." end sorted_fields = fields.sort_by(&:destination_field) # We use `uniq` here to avoid re-doing the same setup multiple times, since multiple fields can sometimes # need the same setup (such as initializing a common parent field to an empty map). function_defs = sorted_fields.flat_map(&:function_definitions).uniq.map(&:strip).sort setup_statements = [STATIC_SETUP_STATEMENTS] + sorted_fields.flat_map(&:setup_statements).uniq.map(&:strip) apply_update_statements = sorted_fields.map { |f| apply_update_statement(f).strip } # Note: comments in the script are effectively "free" since: # # - The compiler will strip them out. # - We only send the script to the datastore once (when configuring the cluster), and later # reference it only by id--so we don't pay for the larger payload on each indexing request. <<~EOS #{function_defs.join("\n\n")} #{setup_statements.join("\n")} #{apply_update_statements.join("\n")} if (!#{SCRIPT_ERRORS_VAR}.isEmpty()) { throw new IllegalArgumentException("#{DERIVED_INDEX_FAILURE_MESSAGE_PREAMBLE}: " + #{SCRIPT_ERRORS_VAR}.join(" ")); } // For records with no new values to index, only skip the update if the document itself doesn't already exist. // Otherwise create an (empty) document to reflect the fact that the id has been seen. if (ctx._source.id != null && #{sorted_fields.map { |f| was_noop_variable(f) }.join(" && ")}) { ctx.op = 'none'; } else { // Here we set `_source.id` because if we don't, it'll never be set, making these docs subtly // different from docs indexed the normal way. // // Note also that we MUST use `params.id` instead of `ctx._id`. The latter works on an update // of an existing document, but is unavailable when we are inserting the document for the first time. ctx._source.id = params.id; } EOS end def apply_update_statement(field) "boolean #{was_noop_variable(field)} = !#{field.apply_operation_returning_update_status};" end def was_noop_variable(field) "#{field.destination_field.gsub(".", "__")}_was_noop" end SCRIPT_ERRORS_VAR = "scriptErrors" STATIC_SETUP_STATEMENTS = <<~EOS.strip Map data = params.data; // A variable to accumulate script errors so that we can surface _all_ issues and not just the first. List #{SCRIPT_ERRORS_VAR} = new ArrayList(); EOS end |
#rollover_timestamp_value_source ⇒ String?
Returns path to field on the source type used as the timestamp field for rollover.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 67 class DerivedIndexedType < ::Struct.new( :source_type, :destination_type_ref, :id_source, :routing_value_source, :rollover_timestamp_value_source, :fields ) # @param source_type [SchemaElements::ObjectType] the type used as a source for this derive type # @param destination_type_ref [SchemaElements::TypeReference] the derived type # @param id_source [String] path to field on the source type used as `id` on the derived type # @param routing_value_source [String, nil] path to field on the source type used for shard routing # @param rollover_timestamp_value_source [String, nil] path to field on the source type used as the timestamp field for rollover # @yield [DerivedIndexedType] the `DerivedIndexedType` instance # @api private def initialize( source_type:, destination_type_ref:, id_source:, routing_value_source:, rollover_timestamp_value_source: ) fields = [] # : ::Array[_DerivedField] super( source_type: source_type, destination_type_ref: destination_type_ref, id_source: id_source, routing_value_source: routing_value_source, rollover_timestamp_value_source: , fields: fields ) yield self end # Configures `field_name` (on the derived indexing type) to contain the set union of all values from # the `from` field on the source type. Values are only ever appended to the set, so the field will # act as an append-only set. # # @param field_name [String] name of field on the derived indexing type to store the derived set # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::AppendOnlySet] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "studentName", "String" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.append_only_set "students", from: "studentName" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "students", "[String!]!" # # t.index "courses" # end # end def append_only_set(field_name, from:) fields << DerivedFields::AppendOnlySet.new(field_name, from) end # Configures `field_name` (on the derived indexing type) to contain a single immutable value from the # `from` field on the source type. Immutability is enforced by triggering an indexing failure with a # clear error if any event's source value is different from the value already indexed on this field. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @param nullable [Boolean] whether the field is allowed to be set to `null`. When set to false, events # that contain a `null` value in the `from` field will be rejected instead of setting the field’s value # to `null`. # @param can_change_from_null [Boolean] whether a one-time mutation of the field value is allowed from # `null` to a non-`null` value. This can be useful when dealing with a field that may not have a value # on all source events. For example, if the source field was not initially part of the schema of your # source dataset, you may have old records that lack a value for this field. When set, this option # allows a one-time mutation of the field value from `null` to a non-`null` value. Once set to a # non-`null` value, any additional `null` values that are encountered will be ignored (ensuring that # the indexed data converges on the same state regardless of the order the events are ingested in). # Note: this option cannot be enabled when `nullable: false` has been set. # @return [DerivedFields::ImmutableValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseName", "String" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.immutable_value "name", from: "courseName" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "name", "String" # # t.index "courses" # end # end def immutable_value(field_name, from:, nullable: true, can_change_from_null: false) if !nullable && can_change_from_null raise Errors::SchemaError, "`can_change_from_null: true` is not allowed with `nullable: false` (as there would be no `null` values to change from)." end fields << DerivedFields::ImmutableValue.new( destination_field: field_name, source_field: from, nullable: nullable, can_change_from_null: can_change_from_null ) end # Configures `field_name` (on the derived indexing type) to contain the minimum of all values from the `from` # field on the source type. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::MinOrMaxValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseStartDate", "Date" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.min_value "firstOfferedDate", from: "courseStartDate" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "firstOfferedDate", "Date" # # t.index "courses" # end # end def min_value(field_name, from:) fields << DerivedFields::MinOrMaxValue.new(field_name, from, :min) end # Configures `field_name` (on the derived indexing type) to contain the maximum of all values from the `from` # field on the source type. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::MinOrMaxValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseStartDate", "Date" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.max_value "mostRecentlyOfferedDate", from: "courseStartDate" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "mostRecentlyOfferedDate", "Date" # # t.index "courses" # end # end def max_value(field_name, from:) fields << DerivedFields::MinOrMaxValue.new(field_name, from, :max) end # @return [Scripting::Script] Painless script that will maintain the derived fields # @api private def painless_script Scripting::Script.new( source: generate_script.strip, name: "#{destination_type_ref}_from_#{source_type.name}", language: "painless", context: "update" ) end # @return [SchemaArtifacts::RuntimeMetadata::UpdateTarget] runtime metadata for the source type # @api private def SchemaArtifacts::RuntimeMetadata::UpdateTarget.new( type: destination_type_ref.name, relationship: nil, script_id: painless_script.id, id_source: id_source, routing_value_source: routing_value_source, rollover_timestamp_value_source: , metadata_params: {}, data_params: fields.map(&:source_field).to_h do |f| [f, SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: f, cardinality: :many)] end ) end private def generate_script if fields.empty? raise Errors::SchemaError, "`derive_indexed_type_fields` definition for #{destination_type_ref} (from #{source_type.name}) " \ "has no derived field definitions." end sorted_fields = fields.sort_by(&:destination_field) # We use `uniq` here to avoid re-doing the same setup multiple times, since multiple fields can sometimes # need the same setup (such as initializing a common parent field to an empty map). function_defs = sorted_fields.flat_map(&:function_definitions).uniq.map(&:strip).sort setup_statements = [STATIC_SETUP_STATEMENTS] + sorted_fields.flat_map(&:setup_statements).uniq.map(&:strip) apply_update_statements = sorted_fields.map { |f| apply_update_statement(f).strip } # Note: comments in the script are effectively "free" since: # # - The compiler will strip them out. # - We only send the script to the datastore once (when configuring the cluster), and later # reference it only by id--so we don't pay for the larger payload on each indexing request. <<~EOS #{function_defs.join("\n\n")} #{setup_statements.join("\n")} #{apply_update_statements.join("\n")} if (!#{SCRIPT_ERRORS_VAR}.isEmpty()) { throw new IllegalArgumentException("#{DERIVED_INDEX_FAILURE_MESSAGE_PREAMBLE}: " + #{SCRIPT_ERRORS_VAR}.join(" ")); } // For records with no new values to index, only skip the update if the document itself doesn't already exist. // Otherwise create an (empty) document to reflect the fact that the id has been seen. if (ctx._source.id != null && #{sorted_fields.map { |f| was_noop_variable(f) }.join(" && ")}) { ctx.op = 'none'; } else { // Here we set `_source.id` because if we don't, it'll never be set, making these docs subtly // different from docs indexed the normal way. // // Note also that we MUST use `params.id` instead of `ctx._id`. The latter works on an update // of an existing document, but is unavailable when we are inserting the document for the first time. ctx._source.id = params.id; } EOS end def apply_update_statement(field) "boolean #{was_noop_variable(field)} = !#{field.apply_operation_returning_update_status};" end def was_noop_variable(field) "#{field.destination_field.gsub(".", "__")}_was_noop" end SCRIPT_ERRORS_VAR = "scriptErrors" STATIC_SETUP_STATEMENTS = <<~EOS.strip Map data = params.data; // A variable to accumulate script errors so that we can surface _all_ issues and not just the first. List #{SCRIPT_ERRORS_VAR} = new ArrayList(); EOS end |
#routing_value_source ⇒ String?
Returns path to field on the source type used for shard routing.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 67 class DerivedIndexedType < ::Struct.new( :source_type, :destination_type_ref, :id_source, :routing_value_source, :rollover_timestamp_value_source, :fields ) # @param source_type [SchemaElements::ObjectType] the type used as a source for this derive type # @param destination_type_ref [SchemaElements::TypeReference] the derived type # @param id_source [String] path to field on the source type used as `id` on the derived type # @param routing_value_source [String, nil] path to field on the source type used for shard routing # @param rollover_timestamp_value_source [String, nil] path to field on the source type used as the timestamp field for rollover # @yield [DerivedIndexedType] the `DerivedIndexedType` instance # @api private def initialize( source_type:, destination_type_ref:, id_source:, routing_value_source:, rollover_timestamp_value_source: ) fields = [] # : ::Array[_DerivedField] super( source_type: source_type, destination_type_ref: destination_type_ref, id_source: id_source, routing_value_source: routing_value_source, rollover_timestamp_value_source: , fields: fields ) yield self end # Configures `field_name` (on the derived indexing type) to contain the set union of all values from # the `from` field on the source type. Values are only ever appended to the set, so the field will # act as an append-only set. # # @param field_name [String] name of field on the derived indexing type to store the derived set # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::AppendOnlySet] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "studentName", "String" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.append_only_set "students", from: "studentName" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "students", "[String!]!" # # t.index "courses" # end # end def append_only_set(field_name, from:) fields << DerivedFields::AppendOnlySet.new(field_name, from) end # Configures `field_name` (on the derived indexing type) to contain a single immutable value from the # `from` field on the source type. Immutability is enforced by triggering an indexing failure with a # clear error if any event's source value is different from the value already indexed on this field. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @param nullable [Boolean] whether the field is allowed to be set to `null`. When set to false, events # that contain a `null` value in the `from` field will be rejected instead of setting the field’s value # to `null`. # @param can_change_from_null [Boolean] whether a one-time mutation of the field value is allowed from # `null` to a non-`null` value. This can be useful when dealing with a field that may not have a value # on all source events. For example, if the source field was not initially part of the schema of your # source dataset, you may have old records that lack a value for this field. When set, this option # allows a one-time mutation of the field value from `null` to a non-`null` value. Once set to a # non-`null` value, any additional `null` values that are encountered will be ignored (ensuring that # the indexed data converges on the same state regardless of the order the events are ingested in). # Note: this option cannot be enabled when `nullable: false` has been set. # @return [DerivedFields::ImmutableValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseName", "String" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.immutable_value "name", from: "courseName" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "name", "String" # # t.index "courses" # end # end def immutable_value(field_name, from:, nullable: true, can_change_from_null: false) if !nullable && can_change_from_null raise Errors::SchemaError, "`can_change_from_null: true` is not allowed with `nullable: false` (as there would be no `null` values to change from)." end fields << DerivedFields::ImmutableValue.new( destination_field: field_name, source_field: from, nullable: nullable, can_change_from_null: can_change_from_null ) end # Configures `field_name` (on the derived indexing type) to contain the minimum of all values from the `from` # field on the source type. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::MinOrMaxValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseStartDate", "Date" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.min_value "firstOfferedDate", from: "courseStartDate" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "firstOfferedDate", "Date" # # t.index "courses" # end # end def min_value(field_name, from:) fields << DerivedFields::MinOrMaxValue.new(field_name, from, :min) end # Configures `field_name` (on the derived indexing type) to contain the maximum of all values from the `from` # field on the source type. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::MinOrMaxValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseStartDate", "Date" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.max_value "mostRecentlyOfferedDate", from: "courseStartDate" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "mostRecentlyOfferedDate", "Date" # # t.index "courses" # end # end def max_value(field_name, from:) fields << DerivedFields::MinOrMaxValue.new(field_name, from, :max) end # @return [Scripting::Script] Painless script that will maintain the derived fields # @api private def painless_script Scripting::Script.new( source: generate_script.strip, name: "#{destination_type_ref}_from_#{source_type.name}", language: "painless", context: "update" ) end # @return [SchemaArtifacts::RuntimeMetadata::UpdateTarget] runtime metadata for the source type # @api private def SchemaArtifacts::RuntimeMetadata::UpdateTarget.new( type: destination_type_ref.name, relationship: nil, script_id: painless_script.id, id_source: id_source, routing_value_source: routing_value_source, rollover_timestamp_value_source: , metadata_params: {}, data_params: fields.map(&:source_field).to_h do |f| [f, SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: f, cardinality: :many)] end ) end private def generate_script if fields.empty? raise Errors::SchemaError, "`derive_indexed_type_fields` definition for #{destination_type_ref} (from #{source_type.name}) " \ "has no derived field definitions." end sorted_fields = fields.sort_by(&:destination_field) # We use `uniq` here to avoid re-doing the same setup multiple times, since multiple fields can sometimes # need the same setup (such as initializing a common parent field to an empty map). function_defs = sorted_fields.flat_map(&:function_definitions).uniq.map(&:strip).sort setup_statements = [STATIC_SETUP_STATEMENTS] + sorted_fields.flat_map(&:setup_statements).uniq.map(&:strip) apply_update_statements = sorted_fields.map { |f| apply_update_statement(f).strip } # Note: comments in the script are effectively "free" since: # # - The compiler will strip them out. # - We only send the script to the datastore once (when configuring the cluster), and later # reference it only by id--so we don't pay for the larger payload on each indexing request. <<~EOS #{function_defs.join("\n\n")} #{setup_statements.join("\n")} #{apply_update_statements.join("\n")} if (!#{SCRIPT_ERRORS_VAR}.isEmpty()) { throw new IllegalArgumentException("#{DERIVED_INDEX_FAILURE_MESSAGE_PREAMBLE}: " + #{SCRIPT_ERRORS_VAR}.join(" ")); } // For records with no new values to index, only skip the update if the document itself doesn't already exist. // Otherwise create an (empty) document to reflect the fact that the id has been seen. if (ctx._source.id != null && #{sorted_fields.map { |f| was_noop_variable(f) }.join(" && ")}) { ctx.op = 'none'; } else { // Here we set `_source.id` because if we don't, it'll never be set, making these docs subtly // different from docs indexed the normal way. // // Note also that we MUST use `params.id` instead of `ctx._id`. The latter works on an update // of an existing document, but is unavailable when we are inserting the document for the first time. ctx._source.id = params.id; } EOS end def apply_update_statement(field) "boolean #{was_noop_variable(field)} = !#{field.apply_operation_returning_update_status};" end def was_noop_variable(field) "#{field.destination_field.gsub(".", "__")}_was_noop" end SCRIPT_ERRORS_VAR = "scriptErrors" STATIC_SETUP_STATEMENTS = <<~EOS.strip Map data = params.data; // A variable to accumulate script errors so that we can surface _all_ issues and not just the first. List #{SCRIPT_ERRORS_VAR} = new ArrayList(); EOS end |
#source_type ⇒ SchemaElements::ObjectType
Returns the type used as a source for this derive type.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 67 class DerivedIndexedType < ::Struct.new( :source_type, :destination_type_ref, :id_source, :routing_value_source, :rollover_timestamp_value_source, :fields ) # @param source_type [SchemaElements::ObjectType] the type used as a source for this derive type # @param destination_type_ref [SchemaElements::TypeReference] the derived type # @param id_source [String] path to field on the source type used as `id` on the derived type # @param routing_value_source [String, nil] path to field on the source type used for shard routing # @param rollover_timestamp_value_source [String, nil] path to field on the source type used as the timestamp field for rollover # @yield [DerivedIndexedType] the `DerivedIndexedType` instance # @api private def initialize( source_type:, destination_type_ref:, id_source:, routing_value_source:, rollover_timestamp_value_source: ) fields = [] # : ::Array[_DerivedField] super( source_type: source_type, destination_type_ref: destination_type_ref, id_source: id_source, routing_value_source: routing_value_source, rollover_timestamp_value_source: , fields: fields ) yield self end # Configures `field_name` (on the derived indexing type) to contain the set union of all values from # the `from` field on the source type. Values are only ever appended to the set, so the field will # act as an append-only set. # # @param field_name [String] name of field on the derived indexing type to store the derived set # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::AppendOnlySet] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "studentName", "String" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.append_only_set "students", from: "studentName" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "students", "[String!]!" # # t.index "courses" # end # end def append_only_set(field_name, from:) fields << DerivedFields::AppendOnlySet.new(field_name, from) end # Configures `field_name` (on the derived indexing type) to contain a single immutable value from the # `from` field on the source type. Immutability is enforced by triggering an indexing failure with a # clear error if any event's source value is different from the value already indexed on this field. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @param nullable [Boolean] whether the field is allowed to be set to `null`. When set to false, events # that contain a `null` value in the `from` field will be rejected instead of setting the field’s value # to `null`. # @param can_change_from_null [Boolean] whether a one-time mutation of the field value is allowed from # `null` to a non-`null` value. This can be useful when dealing with a field that may not have a value # on all source events. For example, if the source field was not initially part of the schema of your # source dataset, you may have old records that lack a value for this field. When set, this option # allows a one-time mutation of the field value from `null` to a non-`null` value. Once set to a # non-`null` value, any additional `null` values that are encountered will be ignored (ensuring that # the indexed data converges on the same state regardless of the order the events are ingested in). # Note: this option cannot be enabled when `nullable: false` has been set. # @return [DerivedFields::ImmutableValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseName", "String" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.immutable_value "name", from: "courseName" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "name", "String" # # t.index "courses" # end # end def immutable_value(field_name, from:, nullable: true, can_change_from_null: false) if !nullable && can_change_from_null raise Errors::SchemaError, "`can_change_from_null: true` is not allowed with `nullable: false` (as there would be no `null` values to change from)." end fields << DerivedFields::ImmutableValue.new( destination_field: field_name, source_field: from, nullable: nullable, can_change_from_null: can_change_from_null ) end # Configures `field_name` (on the derived indexing type) to contain the minimum of all values from the `from` # field on the source type. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::MinOrMaxValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseStartDate", "Date" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.min_value "firstOfferedDate", from: "courseStartDate" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "firstOfferedDate", "Date" # # t.index "courses" # end # end def min_value(field_name, from:) fields << DerivedFields::MinOrMaxValue.new(field_name, from, :min) end # Configures `field_name` (on the derived indexing type) to contain the maximum of all values from the `from` # field on the source type. # # @param field_name [String] name of field on the derived indexing type to store the derived value # @param from [String] path to field on the source type to source values from # @return [DerivedIndexedType::MinOrMaxValue] # # @example # ElasticGraph.define_schema do |schema| # schema.object_type "StudentCourseEnrollment" do |t| # t.field "id", "ID" # t.field "courseId", "ID" # t.field "courseStartDate", "Date" # # t.index "student_course_enrollments" # # t.derive_indexed_type_fields "Course", from_id: "courseId" do |derive| # derive.max_value "mostRecentlyOfferedDate", from: "courseStartDate" # end # end # # schema.object_type "Course" do |t| # t.field "id", "ID" # t.field "mostRecentlyOfferedDate", "Date" # # t.index "courses" # end # end def max_value(field_name, from:) fields << DerivedFields::MinOrMaxValue.new(field_name, from, :max) end # @return [Scripting::Script] Painless script that will maintain the derived fields # @api private def painless_script Scripting::Script.new( source: generate_script.strip, name: "#{destination_type_ref}_from_#{source_type.name}", language: "painless", context: "update" ) end # @return [SchemaArtifacts::RuntimeMetadata::UpdateTarget] runtime metadata for the source type # @api private def SchemaArtifacts::RuntimeMetadata::UpdateTarget.new( type: destination_type_ref.name, relationship: nil, script_id: painless_script.id, id_source: id_source, routing_value_source: routing_value_source, rollover_timestamp_value_source: , metadata_params: {}, data_params: fields.map(&:source_field).to_h do |f| [f, SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: f, cardinality: :many)] end ) end private def generate_script if fields.empty? raise Errors::SchemaError, "`derive_indexed_type_fields` definition for #{destination_type_ref} (from #{source_type.name}) " \ "has no derived field definitions." end sorted_fields = fields.sort_by(&:destination_field) # We use `uniq` here to avoid re-doing the same setup multiple times, since multiple fields can sometimes # need the same setup (such as initializing a common parent field to an empty map). function_defs = sorted_fields.flat_map(&:function_definitions).uniq.map(&:strip).sort setup_statements = [STATIC_SETUP_STATEMENTS] + sorted_fields.flat_map(&:setup_statements).uniq.map(&:strip) apply_update_statements = sorted_fields.map { |f| apply_update_statement(f).strip } # Note: comments in the script are effectively "free" since: # # - The compiler will strip them out. # - We only send the script to the datastore once (when configuring the cluster), and later # reference it only by id--so we don't pay for the larger payload on each indexing request. <<~EOS #{function_defs.join("\n\n")} #{setup_statements.join("\n")} #{apply_update_statements.join("\n")} if (!#{SCRIPT_ERRORS_VAR}.isEmpty()) { throw new IllegalArgumentException("#{DERIVED_INDEX_FAILURE_MESSAGE_PREAMBLE}: " + #{SCRIPT_ERRORS_VAR}.join(" ")); } // For records with no new values to index, only skip the update if the document itself doesn't already exist. // Otherwise create an (empty) document to reflect the fact that the id has been seen. if (ctx._source.id != null && #{sorted_fields.map { |f| was_noop_variable(f) }.join(" && ")}) { ctx.op = 'none'; } else { // Here we set `_source.id` because if we don't, it'll never be set, making these docs subtly // different from docs indexed the normal way. // // Note also that we MUST use `params.id` instead of `ctx._id`. The latter works on an update // of an existing document, but is unavailable when we are inserting the document for the first time. ctx._source.id = params.id; } EOS end def apply_update_statement(field) "boolean #{was_noop_variable(field)} = !#{field.apply_operation_returning_update_status};" end def was_noop_variable(field) "#{field.destination_field.gsub(".", "__")}_was_noop" end SCRIPT_ERRORS_VAR = "scriptErrors" STATIC_SETUP_STATEMENTS = <<~EOS.strip Map data = params.data; // A variable to accumulate script errors so that we can surface _all_ issues and not just the first. List #{SCRIPT_ERRORS_VAR} = new ArrayList(); EOS end |
Instance Method Details
#append_only_set(field_name, from:) ⇒ DerivedIndexedType::AppendOnlySet
Configures field_name
(on the derived indexing type) to contain the set union of all values from
the from
field on the source type. Values are only ever appended to the set, so the field will
act as an append-only set.
130 131 132 |
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 130 def append_only_set(field_name, from:) fields << DerivedFields::AppendOnlySet.new(field_name, from) end |
#immutable_value(field_name, from:, nullable: true, can_change_from_null: false) ⇒ DerivedFields::ImmutableValue
Configures field_name
(on the derived indexing type) to contain a single immutable value from the
from
field on the source type. Immutability is enforced by triggering an indexing failure with a
clear error if any event’s source value is different from the value already indexed on this field.
174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 174 def immutable_value(field_name, from:, nullable: true, can_change_from_null: false) if !nullable && can_change_from_null raise Errors::SchemaError, "`can_change_from_null: true` is not allowed with `nullable: false` (as there would be no `null` values to change from)." end fields << DerivedFields::ImmutableValue.new( destination_field: field_name, source_field: from, nullable: nullable, can_change_from_null: can_change_from_null ) end |
#max_value(field_name, from:) ⇒ DerivedIndexedType::MinOrMaxValue
Configures field_name
(on the derived indexing type) to contain the maximum of all values from the from
field on the source type.
247 248 249 |
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 247 def max_value(field_name, from:) fields << DerivedFields::MinOrMaxValue.new(field_name, from, :max) end |
#min_value(field_name, from:) ⇒ DerivedIndexedType::MinOrMaxValue
Configures field_name
(on the derived indexing type) to contain the minimum of all values from the from
field on the source type.
215 216 217 |
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 215 def min_value(field_name, from:) fields << DerivedFields::MinOrMaxValue.new(field_name, from, :min) end |
#painless_script ⇒ Scripting::Script
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns Painless script that will maintain the derived fields.
253 254 255 256 257 258 259 260 |
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 253 def painless_script Scripting::Script.new( source: generate_script.strip, name: "#{destination_type_ref}_from_#{source_type.name}", language: "painless", context: "update" ) end |
#runtime_metadata_for_source_type ⇒ SchemaArtifacts::RuntimeMetadata::UpdateTarget
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns runtime metadata for the source type.
264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'elasticgraph-schema_definition/lib/elastic_graph/schema_definition/indexing/derived_indexed_type.rb', line 264 def SchemaArtifacts::RuntimeMetadata::UpdateTarget.new( type: destination_type_ref.name, relationship: nil, script_id: painless_script.id, id_source: id_source, routing_value_source: routing_value_source, rollover_timestamp_value_source: , metadata_params: {}, data_params: fields.map(&:source_field).to_h do |f| [f, SchemaArtifacts::RuntimeMetadata::DynamicParam.new(source_path: f, cardinality: :many)] end ) end |