Skip to content

Instantly share code, notes, and snippets.

@jturkel
Created May 30, 2018 01:00
Show Gist options
  • Save jturkel/26ac463dcda3051dc712b890b9d5cb3e to your computer and use it in GitHub Desktop.
Save jturkel/26ac463dcda3051dc712b890b9d5cb3e to your computer and use it in GitHub Desktop.
# Class API that instantiates a new `Overstock::Connector::BatchHandler` for each run of the pipeline
# There would be some registry of handlers that maps 'Overstock Supplier API' to Overstock::Connector::BatchHandler.
# This hides any classes involved in splitting, validating, etc. giving subclasses more freedom to implement behavior
# inline.
# Note method names indicate the action to be performed.
module Overstock
module Connector
class BatchHandler < Base::Connector::BaseBatchHandler
PIPELINE_NAME = 'Overstock Supplier API'.freeze
JSON_SCHEMA_PATH = 'resources/schemas/overstock/overstock-schema-2017-06-23.json'.freeze
S3_BUCKET = 'salsify-ce'.freeze
CONNECTOR_FOLDER = 'overstock'.freeze
JSON_HEADERS = {
'Accept' => 'application/json',
'Content-Type' => 'application/json'
}.freeze
def initialize(payload, channel)
super(payload, channel, connector_folder: CONNECTOR_FOLDER, bucket: S3_BUCKET)
end
def split_products(products)
PipelineStages::OverstockHierarchySplitter.new.process(products)
end
def validate_business_rules(product)
Base::Connector::PipelineStages::BusinessRulesValidator.new('resources/business_rules/overstock.yml', 'overstock').process(product)
end
def serialize(products)
PipelineStages::OverstockProductSerializer.new(
JSON_SCHEMA_PATH
).process(products)
end
def validate_product(product)
Base::Connector::PipelineStages::JsonSchemaValidator.new(
JSON_SCHEMA_PATH,
:per_product
).process(product)
end
def batch(products)
Base::Connector::PipelineStages::ChunkBatcher.new(
500
).process(products)
end
def validate_batch(products)
Base::Connector::PipelineStages::JsonSchemaValidator.new(
JSON_SCHEMA_PATH,
:batch
).process(products)
end
def generate_request(products)
Base::Connector::PipelineStages::HttpPostRequestGenerator.new(
Rails.configuration.overstock.endpoint,
static_headers: JSON_HEADERS,
auth_hook: Base::Connector::PipelineStages::OauthAuthorization.new(access_token),
proxy_url: Rails.configuration.overstock.proxy_url
).process(products)
end
def post_request(request)
Base::Connector::PipelineStages::HttpRequestPoster.new.process(request)
end
def response_parser(response)
PipelineStages::OverstockFeedbackParser.new.process(response)
end
def set_expectations
Base::Connector::PipelineStages::SimpleExpectationSetter.new(
:async,
DateTime.current + 1.day
)
end
private
def access_token
payload.decrypt_setting('Access Token', required: true)
end
end
end
end
# Class API that instantiates a new `Overstock::Connector::BatchHandler` for each run of the pipeline
# There would be some registry of handlers that maps 'Overstock Supplier API' to Overstock::Connector::BatchHandler.
# Note method names indicate an object will be return that performs the real processing
module Overstock
module Connector
class BatchHandler < Base::Connector::BaseBatchHandler
PIPELINE_NAME = 'Overstock Supplier API'.freeze
JSON_SCHEMA_PATH = 'resources/schemas/overstock/overstock-schema-2017-06-23.json'.freeze
S3_BUCKET = 'salsify-ce'.freeze
CONNECTOR_FOLDER = 'overstock'.freeze
JSON_HEADERS = {
'Accept' => 'application/json',
'Content-Type' => 'application/json'
}.freeze
def initialize(payload, channel)
super(payload, channel, connector_folder: CONNECTOR_FOLDER, bucket: S3_BUCKET)
end
def individual_product_splitter
PipelineStages::OverstockHierarchySplitter.new
end
def business_rule_validator
Base::Connector::PipelineStages::BusinessRulesValidator.new('resources/business_rules/overstock.yml', 'overstock')
end
def serializer
PipelineStages::OverstockProductSerializer.new(
JSON_SCHEMA_PATH
)
end
def product_validator
Base::Connector::PipelineStages::JsonSchemaValidator.new(
JSON_SCHEMA_PATH,
:per_product
)
end
def batcher
Base::Connector::PipelineStages::ChunkBatcher.new(
500
)
end
def batch_validator
Base::Connector::PipelineStages::JsonSchemaValidator.new(
JSON_SCHEMA_PATH,
:batch
)
end
def request_generator
Base::Connector::PipelineStages::HttpPostRequestGenerator.new(
Rails.configuration.overstock.endpoint,
static_headers: JSON_HEADERS,
auth_hook: Base::Connector::PipelineStages::OauthAuthorization.new(access_token),
proxy_url: Rails.configuration.overstock.proxy_url
)
end
def request_poster
Base::Connector::PipelineStages::HttpRequestPoster.new
end
def response_parser
PipelineStages::OverstockFeedbackParser.new
end
def expectation_setter
Base::Connector::PipelineStages::SimpleExpectationSetter.new(
:async,
DateTime.current + 1.day
)
end
private
def access_token
payload.decrypt_setting('Access Token', required: true)
end
end
end
end
# DSL API that defines a class under the covers much like rspec. A new instance of the class is
# created for each execution of the pipeline and any blocks are instance_eval'ed in the context
# of that instance
Base::Connector::BatchProcessor.register('Overstock Supplier API') do
JSON_SCHEMA_PATH = 'resources/schemas/overstock/overstock-schema-2017-06-23.json'.freeze
# Register any static configuration using non-block DSL calls
connector_folder 'overstock'
individual_products do
PipelineStages::OverstockHierarchySplitter.new
end
business_rule_validation do
Base::Connector::PipelineStages::BusinessRulesValidator.new('resources/business_rules/overstock.yml', 'overstock')
end
serialize do
PipelineStages::OverstockProductSerializer.new(
JSON_SCHEMA_PATH
)
end
product_validation do
Base::Connector::PipelineStages::JsonSchemaValidator.new(
JSON_SCHEMA_PATH,
:per_product
)
end
batch do
Base::Connector::PipelineStages::ChunkBatcher.new(
500
)
end
batch_validation do
Base::Connector::PipelineStages::JsonSchemaValidator.new(
JSON_SCHEMA_PATH,
:batch
)
)
generate_request do
Base::Connector::PipelineStages::HttpPostRequestGenerator.new(
Rails.configuration.overstock.endpoint,
static_headers: {
'Accept' => 'application/json',
'Content-Type' => 'application/json'
},
auth_hook: Base::Connector::PipelineStages::OauthAuthorization.new(access_token),
proxy_url: Rails.configuration.overstock.proxy_url
)
end
post_request do
Base::Connector::PipelineStages::HttpRequestPoster.new
end
parse_response do
PipelineStages::OverstockFeedbackParser.new
end
set_expectation do
Base::Connector::PipelineStages::SimpleExpectationSetter.new(
:async,
DateTime.current + 1.day
)
end
# The DSL is defining a class so we can define regular methods too
def access_token
# payload is accessible via an instance method
payload.decrypt_setting('Access Token', required: true)
end
end
module Overstock
module Connector
class BatchHandler < Base::Connector::BaseBatchHandler
PIPELINE_NAME = 'Overstock Supplier API'.freeze
JSON_SCHEMA_PATH = 'resources/schemas/overstock/overstock-schema-2017-06-23.json'.freeze
S3_BUCKET = 'salsify-ce'.freeze
CONNECTOR_FOLDER = 'overstock'.freeze
JSON_HEADERS = {
'Accept' => 'application/json',
'Content-Type' => 'application/json'
}.freeze
class BatchValidationError < StandardError; end
attr_reader :configuration
def initialize(payload, channel)
@configuration = Base::Connector::Configuration.new(S3_BUCKET, CONNECTOR_FOLDER)
super(Base::Connector::PayloadLoader.new(payload), @configuration, channel)
end
def create_factory(payload)
Base::Connector::BatchProcessorFactory.create(self, PIPELINE_NAME, configuration, payload) do
individual_products(
PipelineStages::OverstockHierarchySplitter.new
)
business_rule_validation(
Base::Connector::PipelineStages::BusinessRulesValidator.new('resources/business_rules/overstock.yml', 'overstock')
)
serialize(
PipelineStages::OverstockProductSerializer.new(
JSON_SCHEMA_PATH
)
)
product_validation(
Base::Connector::PipelineStages::JsonSchemaValidator.new(
JSON_SCHEMA_PATH,
:per_product
)
)
batch(
Base::Connector::PipelineStages::ChunkBatcher.new(
500
)
)
batch_validation(
Base::Connector::PipelineStages::JsonSchemaValidator.new(
JSON_SCHEMA_PATH,
:batch
)
)
generate_request(
Base::Connector::PipelineStages::HttpPostRequestGenerator.new(
Rails.configuration.overstock.endpoint,
static_headers: JSON_HEADERS,
auth_hook: Base::Connector::PipelineStages::OauthAuthorization.new(access_token),
proxy_url: Rails.configuration.overstock.proxy_url
)
)
post_request(
Base::Connector::PipelineStages::HttpRequestPoster.new
)
parse_response(
PipelineStages::OverstockFeedbackParser.new
)
set_expectation(
Base::Connector::PipelineStages::SimpleExpectationSetter.new(
:async,
DateTime.current + 1.day
)
)
end
end
def access_token
payload.decrypt_setting('Access Token', required: true)
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment