Created
March 27, 2014 21:51
-
-
Save amedina/9819834 to your computer and use it in GitHub Desktop.
QueryStatsBolt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.adsquerylistener.storm; | |
import java.util.Map; | |
import java.util.concurrent.TimeUnit; | |
import javax.annotation.concurrent.NotThreadSafe; | |
import com.google.common.annotations.VisibleForTesting; | |
import com.google.common.base.Optional; | |
import com.google.common.cache.Cache; | |
import com.google.common.cache.CacheBuilder; | |
import com.google.common.collect.ImmutableList; | |
import com.google.inject.Guice; | |
import com.google.inject.Inject; | |
import com.google.inject.Injector; | |
import com.google.inject.name.Named; | |
import net.lag.configgy.Config; | |
import org.slf4j.Logger; | |
import com.twitter.ads.adsquerylistener.DeployMode; | |
import com.twitter.ads.adsquerylistener.StatsConstants; | |
import com.twitter.ads.adsquerylistener.config.ConfigConstants; | |
import com.twitter.ads.adsquerylistener.humaneval.HumanEvalDispatcher; | |
import com.twitter.ads.adsquerylistener.topqueries.QueryStats; | |
import com.twitter.ads.adsquerylistener.topqueries.TrendingQueryDecider; | |
import com.twitter.ads.common.guice.Conf; | |
import com.twitter.ads.common.logging.LoggerFactory; | |
import com.twitter.ads.human_eval.HumanEvalException; | |
import backtype.storm.metric.api.AssignableMetric; | |
import backtype.storm.metric.api.CountMetric; | |
import backtype.storm.task.OutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.base.BaseRichBolt; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Tuple; | |
/** | |
* Handles logic to select queries to be sent for human eval. | |
*/ | |
@NotThreadSafe | |
public class QueryStatsBolt extends BaseRichBolt { | |
private static final Logger LOG = LoggerFactory.get(); | |
private static final Object WORKER_LOCK = new Object(); | |
private static HumanEvalDispatcher humanEvalDispatcher = null; | |
private static Injector injector; | |
private OutputCollector outputCollector; | |
private Cache<String, QueryStats> queryStatsCache; | |
private TrendingQueryDecider trendingQueryDecider; | |
private long halfLifeSeconds = 0L; | |
private String configString; | |
private DeployMode deployMode; | |
// Stats reporting-related members | |
private CountMetric numQueriesStats; | |
private AssignableMetric queryStatsCacheSizeStats; | |
/** | |
* Constructor to build a top queries bolt. | |
*/ | |
@Inject | |
public QueryStatsBolt( | |
@Conf(ConfigConstants.QUERY_CACHE_NUM_KEYS) int cacheNumKeys, | |
@Conf(ConfigConstants.QUERY_CACHE_EXPIRATION_MINS) int cacheExpirationMinutes, | |
@Conf(ConfigConstants.QUERY_CACHE_HALF_LIFE_SECS) long halfLifeSeconds, | |
final TrendingQueryDecider trendingQueryDecider, | |
DeployMode deployMode, | |
@Named(ConfigConstants.CONF_STRING) String configString) { | |
queryStatsCache = CacheBuilder.newBuilder() | |
.maximumSize(cacheNumKeys) | |
.expireAfterAccess(cacheExpirationMinutes, TimeUnit.MINUTES) | |
.build(); | |
this.configString = configString; | |
this.trendingQueryDecider = trendingQueryDecider; | |
this.halfLifeSeconds = halfLifeSeconds; | |
this.deployMode = deployMode; | |
} | |
/** | |
* Query stats bolt for testing purpose. | |
*/ | |
@VisibleForTesting | |
public QueryStatsBolt(int cacheNumKeys, int cacheExpiration, long halfLifeSecs, | |
TrendingQueryDecider trendingQueryDecider, | |
HumanEvalDispatcher dispatcher, OutputCollector collector) { | |
queryStatsCache = CacheBuilder.newBuilder() | |
.maximumSize(cacheNumKeys) | |
.expireAfterAccess(cacheExpiration, TimeUnit.MINUTES) | |
.build(); | |
this.trendingQueryDecider = trendingQueryDecider; | |
this.halfLifeSeconds = halfLifeSecs; | |
this.humanEvalDispatcher = dispatcher; | |
this.outputCollector = collector; | |
} | |
@VisibleForTesting | |
protected void initStats(TopologyContext topologyContext) { | |
this.numQueriesStats = (CountMetric) topologyContext.registerMetric( | |
StatsConstants.NUM_QUERIES, | |
new CountMetric(), | |
StatsConstants.SECONDS_TO_POLL); | |
this.queryStatsCacheSizeStats = (AssignableMetric) topologyContext.registerMetric( | |
StatsConstants.QUERY_BOLT_STATS_CACHE_SIZE, | |
new AssignableMetric(queryStatsCache.size()), | |
StatsConstants.SECONDS_TO_POLL); | |
} | |
@Override | |
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) { | |
this.outputCollector = collector; | |
initStats(topologyContext); | |
// We want to build objects and their dependency tree on the storm cluster. If we | |
// don't delay this guice injection, storm will require us to serialize every class | |
// that is being injected. | |
synchronized (WORKER_LOCK) { | |
if (null == injector) { | |
injector = Guice.createInjector( | |
deployMode.getGuiceModule(Config.fromString(configString))); | |
humanEvalDispatcher = injector.getInstance(HumanEvalDispatcher.class); | |
humanEvalDispatcher.initialize(topologyContext); | |
} | |
} | |
} | |
// Note this is not thread-safe: If 2 threads provide the same query, | |
// this code may behave incorrectly. | |
@Override | |
public void execute(Tuple tuple) { | |
String query = (String) tuple.getValue(0); | |
int auctionDepth = ((Integer) tuple.getValue(1)).intValue(); | |
long currentTimeInMillis = System.currentTimeMillis(); | |
// Increment stats for number of queries observed and query stats cache size. | |
numQueriesStats.incr(); | |
queryStatsCacheSizeStats.setValue(queryStatsCache.size()); | |
// Insert into cache if not present | |
QueryStats queryStats = queryStatsCache.getIfPresent(query); | |
if (null == queryStats) { | |
queryStats = new QueryStats(); | |
} | |
queryStats.updateStats(1, auctionDepth, currentTimeInMillis, halfLifeSeconds); | |
queryStatsCache.put(query, queryStats); | |
if (trendingQueryDecider.isTrendingQuery(queryStats)) { | |
try { | |
humanEvalDispatcher.dispatchQuery(query, queryStats.getRequests()); | |
} catch (HumanEvalException e) { | |
LOG.warn("Error when dispatching query."); | |
} | |
outputCollector.emit(ImmutableList.<Object>of(query)); | |
} | |
outputCollector.ack(tuple); | |
} | |
public Optional<QueryStats> getQueryStatsForQuery(String query) { | |
return Optional.fromNullable(queryStatsCache.getIfPresent(query)); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields(StormConstants.QUERY_FIELD)); | |
} | |
public long getQueryStatsCacheSize() { | |
return queryStatsCache.size(); | |
} | |
} |
Hey, this seems to be nice
markdav have you found how to implement the topology and the guice module creation in the prepare ?
Regards
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey cool stuff - do you have an example of the topology that calls this and builds the modules? Or the DeployMode class? I am trying to do something similar, but creating the modules in the prepare and seeing some issues when deployed to a cluster.