Class: ElasticGraph::WarehouseLambda::WarehouseDumper

Inherits:
Object
  • Object
show all
Defined in:
elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb

Overview

Responsible for dumping data into a data warehouse. Implements the same interface as DatastoreIndexingRouter from elasticgraph-indexer so that it can be used in place of the standard datastore indexing router.

Constant Summary collapse

LOG_MSG_RECEIVED_BATCH =

Returns message type for logging when a batch is received.

Returns:

  • (String)

    message type for logging when a batch is received

"WarehouseLambdaReceivedBatch"
LOG_MSG_DUMPED_FILE =

Returns message type for logging when a file is dumped to S3.

Returns:

  • (String)

    message type for logging when a file is dumped to S3

"DumpedToWarehouseFile"

Instance Method Summary collapse

Constructor Details

#initialize(logger:, s3_client:, s3_bucket_name:, s3_file_prefix:, clock:) ⇒ WarehouseDumper

Returns a new instance of WarehouseDumper.



28
29
30
31
32
33
34
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb', line 28

def initialize(logger:, s3_client:, s3_bucket_name:, s3_file_prefix:, clock:)
  @logger = logger
  @s3_client = s3_client
  @s3_bucket_name = s3_bucket_name
  @s3_file_prefix = s3_file_prefix
  @clock = clock
end

Instance Method Details

#bulk(operations, refresh: false) ⇒ BulkResult

Processes a batch of indexing operations by dumping them to S3 as gzipped JSONL files. Operations are grouped by GraphQL type and JSON schema version, with each group written to a separate file.

Parameters:

  • operations (Array<Operation>)

    the indexing operations to process

  • refresh (Boolean) (defaults to: false)

    ignored (included for interface compatibility with DatastoreIndexingRouter)

Returns:

  • (BulkResult)

    result containing success status for all operations



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb', line 42

def bulk(operations, refresh: false)
  operations_by_type_and_json_schema_version = operations.group_by { |op| [op.event.fetch("type"), op.event.fetch(JSON_SCHEMA_VERSION_KEY)] }

  @logger.info({
    "message_type" => LOG_MSG_RECEIVED_BATCH,
    "record_counts_by_type" => operations_by_type_and_json_schema_version.transform_keys { |(type, _json_schema_version)| type }.transform_values(&:size)
  })

  operations_by_type_and_json_schema_version.each do |(type, json_schema_version), operations|
    # Operations coming from the indexer are always Update operations for warehouse dumping
    update_operations = operations # : ::Array[::ElasticGraph::Indexer::Operation::Update]
    jsonl_data = build_jsonl_file_from(update_operations)

    # Skip S3 upload if all operations were filtered out (no data to write)
    next if jsonl_data.empty?

    gzip_data = compress(jsonl_data)
    s3_key = generate_s3_key_for(type, json_schema_version)

    # Use if_none_match: "*" to prevent overwrites (defense-in-depth, though UUIDs make collisions impossible)
    @s3_client.put_object(
      bucket: @s3_bucket_name,
      key: s3_key,
      body: gzip_data,
      checksum_algorithm: :sha256,
      if_none_match: "*"
    )

    @logger.info({
      "message_type" => LOG_MSG_DUMPED_FILE,
      "s3_bucket" => @s3_bucket_name,
      "s3_key" => s3_key,
      "type" => type,
      JSON_SCHEMA_VERSION_KEY => json_schema_version,
      "record_count" => operations.size,
      "json_size" => jsonl_data.bytesize,
      "gzip_size" => gzip_data.bytesize
    })
  end

  ops_and_results = operations.map do |op|
    [op, ::ElasticGraph::Indexer::Operation::Result.success_of(op)]
  end # : ::Array[[::ElasticGraph::Indexer::_Operation, ::ElasticGraph::Indexer::Operation::Result]]

  ::ElasticGraph::Indexer::DatastoreIndexingRouter::BulkResult.new({"warehouse" => ops_and_results})
end

#source_event_versions_in_index(operations) ⇒ Hash

Returns existing event versions for the given operations. Always returns an empty hash since the warehouse doesn't maintain version state.

Parameters:

  • operations (Array<Operation>)

    the operations to check (unused)

Returns:

  • (Hash)

    empty hash (warehouse doesn't track versions)



94
95
96
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb', line 94

def source_event_versions_in_index(operations)
  {}
end