Class: ElasticGraph::WarehouseLambda

Inherits:
Object
  • Object
show all
Defined in:
elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda.rb,
elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/config.rb,
elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/lambda_function.rb,
elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda/warehouse_dumper.rb

Overview

AWS Lambda integration for exporting ElasticGraph indexing data to S3 as gzipped JSONL files. This allows downstream analytics pipelines, data warehouses, and lakehouses to consume ElasticGraph data without querying the primary datastore.

Defined Under Namespace

Classes: Config, WarehouseDumper

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config:, indexer_config:, datastore_core:, clock: ::Time, s3_client: nil) ⇒ WarehouseLambda

Initializes a WarehouseLambda instance.

Parameters:

  • config (Config)

    warehouse configuration

  • indexer_config (Config)

    indexer configuration

  • datastore_core (DatastoreCore)

    datastore core for accessing schema artifacts

  • clock (Module) (defaults to: ::Time)

    clock module for time generation (defaults to Time)

  • s3_client (Aws::S3::Client, nil) (defaults to: nil)

    optional S3 client (for testing)



58
59
60
61
62
63
64
65
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda.rb', line 58

def initialize(config:, indexer_config:, datastore_core:, clock: ::Time, s3_client: nil)
  @config = config
  @indexer_config = indexer_config
  @datastore_core = datastore_core
  @logger = datastore_core.logger
  @clock = clock
  @s3_client = s3_client
end

Instance Attribute Details

#clockConfig, ... (readonly)

Returns:

  • (Config)

    warehouse configuration

  • (Indexer::Config)

    indexer configuration

  • (DatastoreCore)

    datastore core for accessing schema artifacts

  • (Logger)

    logger instance from datastore core

  • (Module)

    clock module for time generation



36
37
38
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda.rb', line 36

def clock
  @clock
end

#configConfig, ... (readonly)

Returns:

  • (Config)

    warehouse configuration

  • (Indexer::Config)

    indexer configuration

  • (DatastoreCore)

    datastore core for accessing schema artifacts

  • (Logger)

    logger instance from datastore core

  • (Module)

    clock module for time generation



36
37
38
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda.rb', line 36

def config
  @config
end

#datastore_coreConfig, ... (readonly)

Returns:

  • (Config)

    warehouse configuration

  • (Indexer::Config)

    indexer configuration

  • (DatastoreCore)

    datastore core for accessing schema artifacts

  • (Logger)

    logger instance from datastore core

  • (Module)

    clock module for time generation



36
37
38
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda.rb', line 36

def datastore_core
  @datastore_core
end

#indexer_configConfig, ... (readonly)

Returns:

  • (Config)

    warehouse configuration

  • (Indexer::Config)

    indexer configuration

  • (DatastoreCore)

    datastore core for accessing schema artifacts

  • (Logger)

    logger instance from datastore core

  • (Module)

    clock module for time generation



36
37
38
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda.rb', line 36

def indexer_config
  @indexer_config
end

#loggerConfig, ... (readonly)

Returns:

  • (Config)

    warehouse configuration

  • (Indexer::Config)

    indexer configuration

  • (DatastoreCore)

    datastore core for accessing schema artifacts

  • (Logger)

    logger instance from datastore core

  • (Module)

    clock module for time generation



36
37
38
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda.rb', line 36

def logger
  @logger
end

Class Method Details

.from_parsed_yaml(parsed_yaml) {|Datastore::Client| ... } ⇒ WarehouseLambda

Builds an ElasticGraph::WarehouseLambda instance from parsed YAML configuration.

Parameters:

  • parsed_yaml (Hash)

    parsed YAML configuration

Yields:

  • (Datastore::Client)

    optional block to customize the datastore client

Returns:



43
44
45
46
47
48
49
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda.rb', line 43

def self.from_parsed_yaml(parsed_yaml, &datastore_client_customization_block)
  new(
    config: Config.from_parsed_yaml!(parsed_yaml),
    indexer_config: Indexer::Config.from_parsed_yaml(parsed_yaml) || Indexer::Config.new,
    datastore_core: DatastoreCore.from_parsed_yaml(parsed_yaml, &datastore_client_customization_block)
  )
end

.warehouse_lambda_from_envObject

Builds an ElasticGraph::WarehouseLambda instance from our lambda ENV vars.



26
27
28
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda.rb', line 26

def self.warehouse_lambda_from_env
  LambdaSupport.build_from_env(WarehouseLambda)
end

Instance Method Details

#indexerIndexer

Returns the indexer instance, lazily building it on first access.

Returns:

  • (Indexer)

    the indexer that processes events



77
78
79
80
81
82
83
84
85
86
87
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda.rb', line 77

def indexer
  @indexer ||= begin
    require "elastic_graph/indexer"
    Indexer.new(
      config: indexer_config,
      datastore_core: datastore_core,
      datastore_router: warehouse_dumper,
      clock: clock
    )
  end
end

#processorProcessor

Returns the processor from the indexer for event processing.

Returns:

  • (Processor)

    the processor that handles incoming events



70
71
72
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda.rb', line 70

def processor
  indexer.processor
end

#s3_clientAws::S3::Client

Returns the S3 client instance, lazily building it on first access.

Returns:

  • (Aws::S3::Client)

    the S3 client for uploading data



108
109
110
111
112
113
114
115
116
117
118
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda.rb', line 108

def s3_client
  @s3_client ||= begin
    require "aws-sdk-s3"

    if (region = config.aws_region)
      ::Aws::S3::Client.new(region: region)
    else
      ::Aws::S3::Client.new
    end
  end
end

#warehouse_dumperWarehouseDumper

Returns the warehouse dumper instance, lazily building it on first access.

Returns:



92
93
94
95
96
97
98
99
100
101
102
103
# File 'elasticgraph-warehouse_lambda/lib/elastic_graph/warehouse_lambda.rb', line 92

def warehouse_dumper
  @warehouse_dumper ||= begin
    require "elastic_graph/warehouse_lambda/warehouse_dumper"
    WarehouseDumper.new(
      logger: logger,
      s3_client: s3_client,
      s3_bucket_name: config.s3_bucket_name,
      s3_file_prefix: config.s3_path_prefix,
      clock: clock
    )
  end
end