Class: ElasticGraph::WarehouseLambda::WarehouseDumper
- Inherits:
-
Object
- Object
- ElasticGraph::WarehouseLambda::WarehouseDumper
- Defined in:
- elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb
Overview
Responsible for dumping data into a data warehouse. Implements the same interface as DatastoreIndexingRouter from
elasticgraph-indexer so that it can be used in place of the standard datastore indexing router.
Constant Summary collapse
- LOG_MSG_RECEIVED_BATCH =
Returns message type for logging when a batch is received.
"WarehouseLambdaReceivedBatch"- LOG_MSG_DUMPED_FILE =
Returns message type for logging when a file is dumped to S3.
"DumpedToWarehouseFile"
Instance Method Summary collapse
-
#bulk(operations, refresh: false) ⇒ BulkResult
Processes a batch of indexing operations by dumping them to S3 as gzipped JSONL files.
-
#initialize(logger:, s3_client:, s3_bucket_name:, s3_file_prefix:, clock:) ⇒ WarehouseDumper
constructor
A new instance of WarehouseDumper.
-
#source_event_versions_in_index(operations) ⇒ Hash
Returns existing event versions for the given operations.
Constructor Details
#initialize(logger:, s3_client:, s3_bucket_name:, s3_file_prefix:, clock:) ⇒ WarehouseDumper
Returns a new instance of WarehouseDumper.
28 29 30 31 32 33 34 |
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb', line 28 def initialize(logger:, s3_client:, s3_bucket_name:, s3_file_prefix:, clock:) @logger = logger @s3_client = s3_client @s3_bucket_name = s3_bucket_name @s3_file_prefix = s3_file_prefix @clock = clock end |
Instance Method Details
#bulk(operations, refresh: false) ⇒ BulkResult
Processes a batch of indexing operations by dumping them to S3 as gzipped JSONL files. Operations are grouped by GraphQL type and JSON schema version, with each group written to a separate file.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb', line 42 def bulk(operations, refresh: false) operations_by_type_and_json_schema_version = operations.group_by { |op| [op.event.fetch("type"), op.event.fetch(JSON_SCHEMA_VERSION_KEY)] } @logger.info({ "message_type" => LOG_MSG_RECEIVED_BATCH, "record_counts_by_type" => operations_by_type_and_json_schema_version.transform_keys { |(type, _json_schema_version)| type }.transform_values(&:size) }) operations_by_type_and_json_schema_version.each do |(type, json_schema_version), operations| # Operations coming from the indexer are always Update operations for warehouse dumping update_operations = operations # : ::Array[::ElasticGraph::Indexer::Operation::Update] jsonl_data = build_jsonl_file_from(update_operations) # Skip S3 upload if all operations were filtered out (no data to write) next if jsonl_data.empty? gzip_data = compress(jsonl_data) s3_key = generate_s3_key_for(type, json_schema_version) # Use if_none_match: "*" to prevent overwrites (defense-in-depth, though UUIDs make collisions impossible) @s3_client.put_object( bucket: @s3_bucket_name, key: s3_key, body: gzip_data, checksum_algorithm: :sha256, if_none_match: "*" ) @logger.info({ "message_type" => LOG_MSG_DUMPED_FILE, "s3_bucket" => @s3_bucket_name, "s3_key" => s3_key, "type" => type, JSON_SCHEMA_VERSION_KEY => json_schema_version, "record_count" => operations.size, "json_size" => jsonl_data.bytesize, "gzip_size" => gzip_data.bytesize }) end ops_and_results = operations.map do |op| [op, ::ElasticGraph::Indexer::Operation::Result.success_of(op)] end # : ::Array[[::ElasticGraph::Indexer::_Operation, ::ElasticGraph::Indexer::Operation::Result]] ::ElasticGraph::Indexer::DatastoreIndexingRouter::BulkResult.new({"warehouse" => ops_and_results}) end |
#source_event_versions_in_index(operations) ⇒ Hash
Returns existing event versions for the given operations. Always returns an empty hash since the warehouse doesn't maintain version state.
94 95 96 |
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb', line 94 def source_event_versions_in_index(operations) {} end |