Created
November 18, 2013 18:02
-
-
Save amedina/7532402 to your computer and use it in GitHub Desktop.
Storm: Light xprollup RT
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt; | |
import java.util.List; | |
import org.apache.thrift.TDeserializer; | |
import org.apache.thrift.protocol.TBinaryProtocol; | |
import com.twitter.ads.logging.AdEngagementLogEntry; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTEngKey; | |
import backtype.storm.spout.Scheme; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
public class AdEngagementScheme implements Scheme { | |
/** | |
* Thrift deserializer for AdEngagementLogEntry | |
* @param bytes | |
* @return | |
*/ | |
public List<Object> deserialize(byte[] bytes) { | |
Values tuple = null; | |
try { | |
TDeserializer thriftDeserializer = new TDeserializer(new TBinaryProtocol.Factory()); | |
AdEngagementLogEntry engagement = new AdEngagementLogEntry(); | |
thriftDeserializer.deserialize(engagement, bytes); | |
ExpRollupRTEngKey engKey = ExpRollupRTUtil.createEngagementKey(engagement); | |
return new Values(engKey); | |
} catch (Exception e) { | |
System.err.println("Got exception while dealing with a message"); | |
} | |
return tuple; | |
} | |
public Fields getOutputFields() { | |
return new Fields("engKey"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt; | |
import java.util.List; | |
import org.apache.thrift.TDeserializer; | |
import org.apache.thrift.protocol.TBinaryProtocol; | |
import com.twitter.ads.eventstream.ImpressionCallbackEvent; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTImpCallBackKey; | |
import backtype.storm.spout.Scheme; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
public class AdImpressionScheme implements Scheme { | |
/** | |
* Thrift deserializer for ImpressionCallbackEvent | |
* @param bytes | |
* @return Tuple | |
*/ | |
public List<Object> deserialize(byte[] bytes) { | |
Values tuple = null; | |
try { | |
TDeserializer thriftDeserializer = new TDeserializer(new TBinaryProtocol.Factory()); | |
ImpressionCallbackEvent impression = new ImpressionCallbackEvent(); | |
thriftDeserializer.deserialize(impression, bytes); | |
ExpRollupRTImpCallBackKey impKey = | |
ExpRollupRTUtil.createImpressionCallbackKey(impression); | |
tuple = new Values(impKey); | |
} catch (Exception e) { | |
System.err.println("Got exception while dealing with a message"); | |
} | |
return tuple; | |
} | |
public Fields getOutputFields() { | |
return new Fields("impKey"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt; | |
import java.util.List; | |
import org.apache.thrift.TDeserializer; | |
import org.apache.thrift.protocol.TBinaryProtocol; | |
import com.twitter.ads.logging.AdShardRequestLogEntry; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTReqKey; | |
import backtype.storm.spout.Scheme; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Values; | |
public class AdRequestScheme implements Scheme { | |
/** | |
* Thrift deserializer for AdShardRequestLogEntry | |
* @param bytes | |
* @return Tuple | |
*/ | |
public List<Object> deserialize(byte[] bytes) { | |
Values tuple = null; | |
try { | |
TDeserializer thriftDeserializer = new TDeserializer(new TBinaryProtocol.Factory()); | |
AdShardRequestLogEntry request = new AdShardRequestLogEntry(); | |
thriftDeserializer.deserialize(request, bytes); | |
ExpRollupRTReqKey reqKey = | |
ExpRollupRTUtil.createRequestKey(request.getAdRequestLogEntry()); | |
tuple = new Values(reqKey); | |
} catch (Exception e) { | |
System.err.println("Got exception while dealing with a message"); | |
} | |
return tuple; | |
} | |
public Fields getOutputFields() { | |
return new Fields("reqKey"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
java_library(name = 'exprolluprttopologylib', | |
dependencies = [ | |
pants('3rdparty/storm'), | |
pants('3rdparty/storm:storm-kafka'), | |
pants('3rdparty/storm:storm-kestrel'), | |
pants('3rdparty:thrift'), | |
pants('3rdparty:util-thrift'), | |
pants('3rdparty:finagle-mysql'), | |
pants('src/thrift/com/twitter/ads/eventstream:eventstream'), | |
pants('src/thrift/com/twitter/ads/adserver:adserver_log'), | |
pants('src/thrift/com/twitter/ads/realtime/exprolluprt'), | |
pants('src/java/com/twitter/common/args'), | |
], | |
sources = globs("*.java"), | |
) | |
jvm_binary(name = 'exprolluprttopology', | |
main = 'com.twitter.storm.samples.exprolluprt.ExpRollupRTTopology', | |
dependencies = [ | |
pants(':exprolluprttopologylib') | |
], | |
deploy_excludes = [ | |
exclude('storm', 'storm'), | |
exclude('log4j', 'log4j'), | |
exclude('org.slf4j', 'slf4j-api'), | |
exclude('org.slf4j', 'slf4j-jdk14'), | |
exclude('org.slf4j', 'slf4j-log4j12'), | |
], | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt; | |
import java.net.UnknownHostException; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.util.Arrays; | |
import java.util.Date; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import java.util.Random; | |
import java.util.TimeZone; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.logging.Level; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
// Enable this when deserializing | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTEngKey; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTImpCallBackKey; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTReqKey; | |
import com.twitter.common.args.Arg; | |
import com.twitter.common.args.ArgScanner; | |
import com.twitter.common.args.CmdLine; | |
import com.twitter.finagle.exp.mysql.Client; | |
import com.twitter.finagle.stats.NullStatsReceiver; | |
import backtype.storm.Config; | |
import backtype.storm.StormSubmitter; | |
import backtype.storm.generated.AlreadyAliveException; | |
import backtype.storm.generated.InvalidTopologyException; | |
import backtype.storm.spout.KestrelThriftSpout; | |
import backtype.storm.spout.SchemeAsMultiScheme; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.BasicOutputCollector; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.TopologyBuilder; | |
import backtype.storm.topology.base.BaseBasicBolt; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Tuple; | |
import storm.kafka.KafkaConfig; | |
import storm.kafka.KafkaSpout; | |
import storm.kafka.SpoutConfig; | |
public final class ExpRollupRTTopology { | |
// Can't be instantiated | |
private ExpRollupRTTopology() { | |
} | |
private static int writeBatch = 100; | |
private static int writePeriod = 15 * 60 * 1000; | |
@CmdLine(name = "topology_name", help = "Experiment Rollup Real-time") | |
public static final Arg<String> TOPOLOGY_NAME = Arg.create("light_exprollup_rt"); | |
//Arg.create(System.getProperty("user.name") + "_expRollupRT_STORM_GG"); | |
@CmdLine(name = "max_spout_pending", help = "Max number of unacked tuples from the spout") | |
public static final Arg<Integer> MAX_SPOUT_PENDING = Arg.create(10); | |
@CmdLine(name = "spout0_parallelism", help = "Parallelism hint for spout0") | |
public static final Arg<Integer> SPOUT0_PARALLELISM = Arg.create(10); | |
@CmdLine(name = "stdout_parallelism", help = "Parallelism hint for stdout bolt") | |
public static final Arg<Integer> STDOUT_PARALLELISM = Arg.create(10); | |
@CmdLine(name = "number_workers", help = "Number of workers in the topology") | |
public static final Arg<Integer> NUMBER_WORKERS = Arg.create(3); | |
@CmdLine(name = "number_ackers", help = "Number of ackers in the topology") | |
public static final Arg<Integer> NUMBER_ACKERS = Arg.create(1); | |
@CmdLine(name = "team_name", help = "Team which owns this topology") | |
public static final Arg<String> TEAM_NAME = Arg.create("Ads"); | |
@CmdLine(name = "team_email", help = "Team Email which owns this topology") | |
public static final Arg<String> TEAM_EMAIL = Arg.create("amedina@twitter.com"); | |
@CmdLine(name = "project_name", help = "Topology part of which project/feature work") | |
public static final Arg<String> PROJECT_NAME = Arg.create("Experiments DashBoard RT"); | |
private static final class Constants { | |
/** Zookeeper for discovering kafka brokers */ | |
public static final String KAFKA_BROKER_ZK_HOSTPORT = "szookeeper.smf1.twitter.com:2181"; | |
/** Path to kafka brokers in zookeeper */ | |
public static final String KAFKA_BROKER_ZK_PATH = "/"; | |
/** Zookeeper root */ | |
public static final String KAFKA_CONSUMER_ZK_PATH = "/kafkaconsumer"; | |
/** Zookeeper for discovering kafka brokers */ | |
public static final List<String> KESTREL_HOST_LIST = | |
Arrays.asList( | |
"smf1-ajo-04-sr1.prod.twitter.com", | |
"smf1-ajt-03-sr1.prod.twitter.com", | |
"smf1-aju-03-sr1.prod.twitter.com", | |
"smf1-ajv-03-sr1.prod.twitter.com", | |
"smf1-ajw-04-sr1.prod.twitter.com", | |
"smf1-ajy-04-sr1.prod.twitter.com", | |
"smf1-ajz-04-sr1.prod.twitter.com", | |
"smf1-akp-04-sr1.prod.twitter.com", | |
"smf1-akq-04-sr1.prod.twitter.com", | |
"smf1-akr-04-sr1.prod.twitter.com", | |
"smf1-aks-04-sr1.prod.twitter.com", | |
"smf1-akt-04-sr1.prod.twitter.com", | |
"smf1-akv-04-sr1.prod.twitter.com", | |
"smf1-akw-04-sr1.prod.twitter.com", | |
"smf1-akx-03-sr1.prod.twitter.com", | |
"smf1-aky-04-sr1.prod.twitter.com", | |
"smf1-akz-04-sr1.prod.twitter.com", | |
"smf1-ala-03-sr1.prod.twitter.com", | |
"smf1-ala-04-sr1.prod.twitter.com", | |
"smf1-alb-03-sr1.prod.twitter.com", | |
"smf1-alc-03-sr1.prod.twitter.com", | |
"smf1-alc-04-sr1.prod.twitter.com", | |
"smf1-ald-03-sr1.prod.twitter.com", | |
"smf1-ald-04-sr1.prod.twitter.com", | |
"smf1-ale-01-sr1.prod.twitter.com", | |
"smf1-alf-30-sr1.prod.twitter.com", | |
"smf1-alg-03-sr1.prod.twitter.com", | |
"smf1-alg-04-sr1.prod.twitter.com", | |
"smf1-alh-03-sr1.prod.twitter.com", | |
"smf1-ali-15-sr1.prod.twitter.com", | |
"smf1-ali-30-sr1.prod.twitter.com", | |
"smf1-alk-04-sr1.prod.twitter.com", | |
"smf1-all-03-sr1.prod.twitter.com", | |
"smf1-alm-04-sr1.prod.twitter.com", | |
"smf1-aln-01-sr1.prod.twitter.com", | |
"smf1-alp-03-sr1.prod.twitter.com", | |
"smf1-alp-04-sr1.prod.twitter.com", | |
"smf1-alq-03-sr1.prod.twitter.com", | |
"smf1-alq-04-sr1.prod.twitter.com", | |
"smf1-alr-03-sr1.prod.twitter.com", | |
"smf1-alr-04-sr1.prod.twitter.com", | |
"smf1-als-03-sr1.prod.twitter.com", | |
"smf1-alt-30-sr1.prod.twitter.com", | |
"smf1-alu-01-sr1.prod.twitter.com", | |
"smf1-alv-15-sr1.prod.twitter.com", | |
"smf1-alw-01-sr1.prod.twitter.com", | |
"smf1-alx-30-sr1.prod.twitter.com", | |
"smf1-aly-03-sr1.prod.twitter.com", | |
"smf1-alz-01-sr1.prod.twitter.com", | |
"smf1-alz-15-sr1.prod.twitter.com", | |
"smf1-anm-12-sr1.prod.twitter.com", | |
"smf1-ano-11-sr1.prod.twitter.com", | |
"smf1-anp-12-sr1.prod.twitter.com", | |
"smf1-anq-38-sr1.prod.twitter.com", | |
"smf1-ans-13-sr1.prod.twitter.com", | |
"smf1-aws-24-sr1.prod.twitter.com", | |
"smf1-awu-11-sr1.prod.twitter.com", | |
"smf1-awv-38-sr1.prod.twitter.com", | |
"smf1-aww-38-sr1.prod.twitter.com", | |
"smf1-axh-24-sr1.prod.twitter.com", | |
"smf1-axi-40-sr1.prod.twitter.com", | |
"smf1-axk-09-sr1.prod.twitter.com", | |
"smf1-axt-14-sr1.prod.twitter.com", | |
"smf1-axu-20-sr1.prod.twitter.com", | |
"smf1-axv-12-sr1.prod.twitter.com", | |
"smf1-axv-21-sr1.prod.twitter.com", | |
"smf1-axv-30-sr1.prod.twitter.com", | |
"smf1-axv-35-sr1.prod.twitter.com" | |
); | |
/** Path to kafka brokers in zookeeper */ | |
public static final int KESTREL_PORT = 2229; | |
private Constants() { | |
} | |
} | |
private static class ProcessRequestsBolt extends BaseBasicBolt { | |
private static final Logger LOG = LoggerFactory.getLogger(ProcessRequestsBolt.class.getName()); | |
private Map<ExpRollupRTReqKey, Long> requestsMap | |
= new ConcurrentHashMap<ExpRollupRTReqKey, Long>(); | |
private Long startTime = System.currentTimeMillis(); | |
@Override | |
public void prepare(Map conf, TopologyContext context) { | |
super.prepare(conf, context); | |
} | |
@Override | |
public void execute(Tuple tuple, BasicOutputCollector collector) { | |
TimeZone utcZone = TimeZone.getTimeZone("UTC"); | |
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); | |
dateFormat.setTimeZone(utcZone); | |
ExpRollupRTReqKey reqKey = (ExpRollupRTReqKey) (tuple.getValue(0)); | |
Long count = requestsMap.containsKey(reqKey) ? requestsMap.get(reqKey) : 0L; | |
requestsMap.put(reqKey, count + 1); | |
Long currentTime = System.currentTimeMillis(); | |
String dateString = dateFormat.format(new Date()); | |
if (currentTime - startTime >= writePeriod) { | |
System.out.println("REQUESTS MAP SIZE: " + requestsMap.size()); | |
if (requestsMap.size() > 0) { | |
writeRequestsMapToDB(requestsMap, dateString); | |
requestsMap.clear(); | |
} | |
startTime = currentTime; | |
} | |
} | |
public void writeRequestsMapToDB(Map<ExpRollupRTReqKey, Long> requests, | |
String dateString) { | |
Random r = new Random(); | |
Client client = Client.apply("db-rins-rw-master-001.smf1.twitter.com:3306", | |
"dev.amedina", "iUAawAzhW219rDNbloVQd", "revenue_insight", Level.OFF, | |
new NullStatsReceiver()); | |
String multiInsertSQL = "INSERT IGNORE INTO exp_rollup_rt_requests (date, " | |
+ "client_id, geo, display_loc, exp_id, " | |
+ "count) VALUES "; | |
int mapSize = requests.size(); | |
int writeCounter = 0; | |
int totalCounter = 0; | |
StringBuilder valueStr = new StringBuilder(); | |
for (Entry<ExpRollupRTReqKey, Long> entry : requests.entrySet()) { | |
mapSize -= 1; | |
ExpRollupRTReqKey key = entry.getKey(); | |
Long reqCount = entry.getValue(); | |
List<Integer> expIds = key.expIds; | |
int clientId = key.clientId; | |
int displayLoc = key.displayLoc; | |
String geo = key.countryRegion; | |
for (Integer expId : expIds) { | |
if (writeCounter > 0) { | |
valueStr.append(","); | |
} | |
writeCounter += 1; | |
totalCounter += 1; | |
valueStr.append("('"); | |
valueStr.append(dateString + "'," + clientId + ",'"); | |
valueStr.append(geo + "'," + displayLoc); | |
valueStr.append("," + expId + "," + reqCount + ")"); | |
if (writeCounter % writeBatch == 0 || mapSize == 0) { | |
System.out.println("B: " + r.nextLong() + " " | |
+ multiInsertSQL + valueStr.toString()); | |
try { | |
client.query(multiInsertSQL + " " + valueStr); | |
} catch (Exception ex) { | |
ex.printStackTrace(); | |
} | |
writeCounter = 0; | |
valueStr = new StringBuilder(); | |
} | |
} | |
} | |
System.out.println("*** TOTAL REQUESTS INSERTION COUNTER: " + totalCounter); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { | |
//outputFieldsDeclarer.declare(new Fields("requestKey", "reqId", "clientId", "count")) | |
} | |
} | |
private static class ProcessImpresssionsBolt extends BaseBasicBolt { | |
private static final Logger | |
LOG = LoggerFactory.getLogger(ProcessImpresssionsBolt.class.getName()); | |
private Map<ExpRollupRTImpCallBackKey, Long> impressionsMap | |
= new HashMap<ExpRollupRTImpCallBackKey, Long>(); | |
private Long startTime = System.currentTimeMillis(); | |
@Override | |
public void prepare(Map conf, TopologyContext context) { | |
super.prepare(conf, context); | |
} | |
@Override | |
public void execute(Tuple tuple, BasicOutputCollector collector) { | |
ExpRollupRTImpCallBackKey impKey = (ExpRollupRTImpCallBackKey) (tuple.getValue(0)); | |
Long count = impressionsMap.containsKey(impKey) ? impressionsMap.get(impKey) : 0L; | |
impressionsMap.put(impKey, count + 1); | |
TimeZone utcZone = TimeZone.getTimeZone("UTC"); | |
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); | |
dateFormat.setTimeZone(utcZone); | |
String dateString = dateFormat.format(new Date()); | |
Long currentTime = System.currentTimeMillis(); | |
if (currentTime - startTime >= writePeriod) { | |
System.out.println("IMPRESSIONS MAP SIZE: " + impressionsMap.size()); | |
if (impressionsMap.size() > 0) { | |
writeImpressionsMapToDB(impressionsMap, dateString); | |
impressionsMap.clear(); | |
} | |
startTime = currentTime; | |
} | |
} | |
public void writeImpressionsMapToDB(Map<ExpRollupRTImpCallBackKey, Long> impressions, | |
String dateString) { | |
Client client = Client.apply("db-rins-rw-master-001.smf1.twitter.com:3306", | |
"dev.amedina", "iUAawAzhW219rDNbloVQd", "revenue_insight", Level.OFF, | |
new NullStatsReceiver()); | |
String multiInsertSQL = "INSERT IGNORE INTO exp_rollup_rt_impressions (date, " | |
+ "client_id, geo, display_loc, exp_id, " | |
+ "count) VALUES "; | |
int mapSize = impressions.size(); | |
int writeCounter = 0; | |
int totalCounter = 0; | |
StringBuilder valueStr = new StringBuilder(); | |
for (Entry<ExpRollupRTImpCallBackKey, Long> entry : impressions.entrySet()) { | |
mapSize -= 1; | |
ExpRollupRTImpCallBackKey key = entry.getKey(); | |
List<Integer> expIds = key.expIds; | |
int clientId = key.clientId; | |
int displayLoc = key.displayLoc; | |
String geo = key.countryRegion; | |
Long impCount = entry.getValue(); | |
for (Integer expId : expIds) { | |
if (writeCounter > 0) { | |
valueStr.append(","); | |
} | |
writeCounter += 1; | |
totalCounter += 1; | |
valueStr.append("('"); | |
valueStr.append(dateString + "'," + clientId + ",'"); | |
valueStr.append(geo + "'," + displayLoc); | |
valueStr.append("," + expId + "," + impCount + ")"); | |
if (writeCounter % writeBatch == 0 || mapSize == 0) { | |
System.out.println(multiInsertSQL + valueStr.toString()); | |
try { | |
client.query(multiInsertSQL + " " + valueStr); | |
} catch (Exception ex) { | |
ex.printStackTrace(); | |
} | |
writeCounter = 0; | |
valueStr = new StringBuilder(); | |
} | |
} | |
} | |
System.out.println("**** TOTAL IMPRESSIONS INSERTION COUNTER: " + totalCounter); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { | |
} | |
} | |
private static class ProcessEngagementsBolt extends BaseBasicBolt { | |
private Map<ExpRollupRTEngKey, Long> engagementsMap = new HashMap<ExpRollupRTEngKey, Long>(); | |
private Long startTime = System.currentTimeMillis(); | |
private static final Logger | |
LOG = LoggerFactory.getLogger(ProcessEngagementsBolt.class.getName()); | |
@Override | |
public void prepare(Map conf, TopologyContext context) { | |
super.prepare(conf, context); | |
} | |
@Override | |
public void execute(Tuple tuple, BasicOutputCollector collector) { | |
ExpRollupRTEngKey engKey = (ExpRollupRTEngKey) (tuple.getValue(0)); | |
Long count = engagementsMap.containsKey(engKey) ? engagementsMap.get(engKey) : 0L; | |
engagementsMap.put(engKey, count + 1); | |
TimeZone utcZone = TimeZone.getTimeZone("UTC"); | |
DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); | |
dateFormat.setTimeZone(utcZone); | |
String dateString = dateFormat.format(new Date()); | |
Long currentTime = System.currentTimeMillis(); | |
if (currentTime - startTime >= writePeriod) { | |
System.out.println("ENGAGEMENTS MAP SIZE: " + engagementsMap.size()); | |
if (engagementsMap.size() > 0) { | |
writeEngagementsMapToDB(engagementsMap, dateString); | |
engagementsMap.clear(); | |
} | |
startTime = currentTime; | |
} | |
} | |
public void writeEngagementsMapToDB(Map<ExpRollupRTEngKey, Long> engagements, | |
String dateString) { | |
Client client = Client.apply("db-rins-rw-master-001.smf1.twitter.com:3306", | |
"dev.amedina", "iUAawAzhW219rDNbloVQd", "revenue_insight", Level.OFF, | |
new NullStatsReceiver()); | |
String multiInsertSQL = "INSERT IGNORE INTO exp_rollup_rt_engagements (date, " | |
+ "client_id, engagement_type, count) VALUES "; | |
int mapSize = engagements.size(); | |
int writeCounter = 0; | |
int totalCounter = 0; | |
StringBuilder valueStr = new StringBuilder(); | |
for (Entry<ExpRollupRTEngKey, Long> entry : engagements.entrySet()) { | |
mapSize -= 1; | |
ExpRollupRTEngKey key = entry.getKey(); | |
int clientId = key.clientId; | |
int engType = key.engagementType; | |
Long engCount = entry.getValue(); | |
if (writeCounter > 0) { | |
valueStr.append(","); | |
} | |
writeCounter += 1; | |
totalCounter += 1; | |
valueStr.append("('"); | |
valueStr.append(dateString + "'," + clientId + ","); | |
valueStr.append(engType + "," + engCount + ")"); | |
if (writeCounter % writeBatch == 0 || mapSize == 0) { | |
System.out.println(multiInsertSQL + valueStr.toString()); | |
try { | |
client.query(multiInsertSQL + " " + valueStr); | |
} catch (Exception ex) { | |
ex.printStackTrace(); | |
} | |
writeCounter = 0; | |
valueStr = new StringBuilder(); | |
} | |
} | |
System.out.println("**** TOTAL ENGAGEMENTS INSERTION COUNTER: " + totalCounter); | |
} | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { | |
} | |
} | |
private static KafkaSpout buildAdRequestsSpout(String topologyName) { | |
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.ZkHosts( | |
"szookeeper.local.twitter.com:2181", "/twitter/kafka/brokers"), | |
"adshard_requests", | |
"/kafkaconsumer", | |
topologyName + "-adshard_requests"); | |
spoutConfig.scheme = new SchemeAsMultiScheme(new AdRequestScheme()); | |
spoutConfig.forceStartOffsetTime(-1); | |
spoutConfig.zkPort = 2181; | |
return new KafkaSpout(spoutConfig); | |
} | |
private static KestrelThriftSpout buildAdImpressionsSpout(String topologyName) { | |
KestrelThriftSpout spout = | |
new KestrelThriftSpout(Constants.KESTREL_HOST_LIST, Constants.KESTREL_PORT, | |
"ad_impressions_callback", new AdImpressionScheme()); | |
return spout; | |
} | |
private static KafkaSpout buildAdEngagementsSpout(String topologyName) { | |
SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.ZkHosts( | |
"szookeeper.local.twitter.com:2181", "/twitter/kafka/brokers"), | |
"ad_engagements", | |
"/kafkaconsumer", | |
topologyName + "-ad_engagements"); | |
spoutConfig.scheme = new SchemeAsMultiScheme(new AdEngagementScheme()); | |
spoutConfig.forceStartOffsetTime(-1); | |
spoutConfig.zkPort = 2181; | |
return new KafkaSpout(spoutConfig); | |
} | |
/** Submits toplogy */ | |
public static void main(String[] args) throws UnknownHostException, | |
AlreadyAliveException, InvalidTopologyException { | |
if (!new ArgScanner().parse(Arrays.asList(args))) { | |
throw new RuntimeException("Failed to parse arguments."); | |
} | |
TopologyBuilder builder = new TopologyBuilder(); | |
builder.setSpout("requests", | |
buildAdRequestsSpout(TOPOLOGY_NAME.get()), SPOUT0_PARALLELISM.get()); | |
builder.setBolt("processRequests", | |
new ProcessRequestsBolt(), STDOUT_PARALLELISM.get()) | |
.fieldsGrouping("requests", new Fields("reqKey")); | |
// builder.setSpout("impressions", | |
// buildAdImpressionsSpout(TOPOLOGY_NAME.get()), SPOUT0_PARALLELISM.get()); | |
// builder.setBolt("processImpressions", | |
// new ProcessImpresssionsBolt(), STDOUT_PARALLELISM.get()) | |
// .fieldsGrouping("impressions", new Fields("impKey")); | |
builder.setSpout("engagements", | |
buildAdEngagementsSpout(TOPOLOGY_NAME.get()), SPOUT0_PARALLELISM.get()); | |
builder.setBolt("processEngagements", | |
new ProcessEngagementsBolt(), STDOUT_PARALLELISM.get()) | |
.fieldsGrouping("engagements", new Fields("engKey")); | |
Config conf = new Config(); | |
conf.setDebug(false); | |
conf.setNumWorkers(NUMBER_WORKERS.get()); | |
conf.setMaxSpoutPending(MAX_SPOUT_PENDING.get()); | |
conf.setNumAckers(NUMBER_ACKERS.get()); | |
conf.setTeamEmail(TEAM_EMAIL.get()); | |
conf.setTeamName(TEAM_NAME.get()); | |
conf.setTopologyProjectName(PROJECT_NAME.get()); | |
conf.setTopologyCapTicket("CAP-1586"); | |
StormSubmitter.submitTopology(TOPOLOGY_NAME.get(), conf, builder.createTopology()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.ads.batch.experimental.amedina.storm.exprolluprt; | |
import java.io.Serializable; | |
import com.twitter.ads.eventstream.ImpressionCallbackEvent; | |
import com.twitter.ads.logging.AdEngagementLogEntry; | |
import com.twitter.ads.logging.AdRequestLogEntry; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTEngKey; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTImpCallBackKey; | |
import com.twitter.ads.realtime.exprolluprt.ExpRollupRTReqKey; | |
public final class ExpRollupRTUtil implements Serializable { | |
private ExpRollupRTUtil() { | |
} | |
/** | |
* Create Request key from AdRequest LogEntry | |
* @param adReq : AdRequestLogEntry | |
* @return ExpRollupRTReqKey | |
*/ | |
public static ExpRollupRTReqKey createRequestKey(AdRequestLogEntry adReq) { | |
ExpRollupRTReqKey reqKey = new ExpRollupRTReqKey(); | |
reqKey.clientId = adReq.getClientInfo().getClientId(); | |
reqKey.expIds = adReq.getExperimentKey().getExperimentIds(); | |
reqKey.countryRegion = adReq.getClientInfo().getCountryCode(); | |
reqKey.displayLoc = adReq.getRequest().getDisplayLocation().getValue(); | |
return reqKey; | |
} | |
/** | |
* Create Impression Call Back key from Impression Call Back Event | |
* @param icbEvt : ImpressionCallbackEvent | |
* @return ExpRollupRTImpCallBackKey | |
*/ | |
public static ExpRollupRTImpCallBackKey | |
createImpressionCallbackKey(ImpressionCallbackEvent icbEvt) { | |
ExpRollupRTImpCallBackKey impKey = new ExpRollupRTImpCallBackKey(); | |
impKey.clientId = icbEvt.getImpressionData().getClientId(); | |
impKey.expIds = icbEvt.getImpressionData().getExperimentKey().getExperimentIds(); | |
impKey.countryRegion = icbEvt.getClientInfo().getCountryCode(); | |
impKey.displayLoc = icbEvt.getImpressionData().getDisplayLocation().getValue(); | |
return impKey; | |
} | |
/** | |
* Create Engagement Key from AdEngagementLog Entry | |
* @param adEng : AdEngagementLogEntry | |
* @return ExpRollupRTEngKey | |
*/ | |
public static ExpRollupRTEngKey createEngagementKey(AdEngagementLogEntry adEng) { | |
ExpRollupRTEngKey engKey = new ExpRollupRTEngKey(); | |
engKey.clientId = adEng.getClientInfo().getClientId(); | |
engKey.countryRegion = adEng.getClientInfo().getCountryCode(); | |
engKey.engagementType = adEng.getEngagement().getType().getValue(); | |
return engKey; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
at com.twitter.chill.storm.BlizzardKryoFactory.getKryo(BlizzardKryoFactory.java:40) ~[stormjar.jar:na] | |
at backtype.storm.serialization.SerializationFactory.getKryo(SerializationFactory.java:33) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.serialization.KryoValuesDeserializer.<init>(KryoValuesDeserializer.java:15) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.serialization.KryoTupleDeserializer.<init>(KryoTupleDeserializer.java:22) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.executor$mk_executor_data$fn__3420.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.util$assoc_apply_self.invoke(util.clj:734) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.executor$mk_executor_data.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:304) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831$iter__4836__4840$fn__4841.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.RT.seq(RT.java:473) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$seq.invoke(core.clj:133) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$dorun.invoke(core.clj:2725) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$doall.invoke(core.clj:2741) ~[org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.AFn.applyToHelper(AFn.java:185) [org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$apply.invoke(core.clj:601) ~[org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker$fn__4830$mk_worker__4886.doInvoke(worker.clj:318) [storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.RestFn.invoke(RestFn.java:512) [org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker$_main.invoke(worker.clj:428) [storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.AFn.applyToHelper(AFn.java:172) [org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker.main(Unknown Source) [storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 | |
at java.util.ArrayList.rangeCheck(ArrayList.java:635) ~[na:1.7.0-u40-rel_20] | |
at java.util.ArrayList.get(ArrayList.java:411) ~[na:1.7.0-u40-rel_20] | |
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:773) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:646) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
... 29 common frames omitted | |
2013-11-11 05:39:57 . b.s.util [INFO] Halting process: ("Error on initialization") | |
2013-11-11 05:41:20 . b.s.d.worker [INFO] Launching worker for summingbird_QueryCountJob_pradhuman-75-1384144487 on smf1-atg-05-sr3.prod.twitter.com:31001 with id 58f8104b-5073-4ebb-aea6-062fb62fd0f5 and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 4, "zmq.linger.millis" 5000, "topology.skip.missing.kryo.registrations" false, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 40000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 500, "drpc.servers" ["smf1-atx-22-sr4.prod.twitter.com"], "nimbus.monitor.freq.secs" 10, "java.library.path" "native:/usr/local/lib:/usr/lib64", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "../storm-local", "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "smf1-atx-22-sr4.prod.twitter.com", "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm-dev", "supervisor.enable" true, "storm.zookeeper.servers" ["newstormzookeeper.local.twitter.com"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" 1, "mesos.framework.name" "storm-dev-smf1", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m -Xloggc:./logs/gc-%ID%.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=15 -XX:GCLogFileSize=128M", "supervisor.heartbeat.frequency.secs" 15, "topology.error.throttle.interval.secs" 30, "mesos.master.failover.timeout.secs" 3600, "cluster.type" "dev", "zmq.hwm" 10000, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 15, "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 15, "topology.tasks" nil, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.metrics.consumer.register" [{"class" "com.twitter.storm.metric.WriteMetricsToCuckoo", "parallelism.hint" 1}], "topology.max.spout.pending" nil, "mesos.master.url" "zk://mesos:mesos@szookeeper.local.twitter.com:2181/home/mesos/multi/master", "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "mesos.executor.uri.default" "hftp://hadoop-rt-nn.smf1.twitter.com:50070/user/storm/mesos_releases/dev-smf1-1/mesos-storm-0.9.0-wip15-2.2.0.0.tgz", "nimbus.topology.validator" "storm.mesos.DisallowNonIsolatedTopologies", "supervisor.slots.ports" [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 75, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "mesos.allowed.hosts" ["smf1-ayp-03-sr4.prod.twitter.com" "smf1-ayr-37-sr1.prod.twitter.com" "smf1-ays-34-sr1.prod.twitter.com" "smf1-ayt-08-sr4.prod.twitter.com" "smf1-ayu-03-sr1.prod.twitter.com" "smf1-ayv-37-sr4.prod.twitter.com" "smf1-ayw-26-sr4.prod.twitter.com" "smf1-asz-10-sr3.prod.twitter.com" "smf1-asz-28-sr2.prod.twitter.com" "smf1-atd-16-sr3.prod.twitter.com" "smf1-atb-10-sr3.prod.twitter.com" "smf1-asx-10-sr2.prod.twitter.com" "smf1-asx-01-sr2.prod.twitter.com" "smf1-atd-38-sr2.prod.twitter.com" "smf1-asx-10-sr3.prod.twitter.com" "smf1-atg-05-sr3.prod.twitter.com" "smf1-atd-28-sr2.prod.twitter.com" "smf1-atg-22-sr2.prod.twitter.com" "smf1-atg-22-sr3.prod.twitter.com" "smf1-asz-13-sr3.prod.twitter.com" "smf1-bjj-03-sr1.prod.twitter.com" "smf1-bjj-11-sr2.prod.twitter.com" "smf1-bjj-03-sr2.prod.twitter.com" "smf1-bhk-14-sr4.prod.twitter.com" "smf1-bma-19-sr4.prod.twitter.com" "smf1-bji-34-sr4.prod.twitter.com" "smf1-bjl-26-sr4.prod.twitter.com" "smf1-bjj-29-sr4.prod.twitter.com" "smf1-bmc-08-sr4.prod.twitter.com" "smf1-bjh-19-sr4.prod.twitter.com" "smf1-bjk-08-sr4.prod.twitter.com"], "storm.scheduler" "backtype.storm.scheduler.IsolationScheduler", "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 3, "topology.acker.tasks" nil, "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 75, "storm.zookeeper.connection.timeout" 40000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "zmq.threads" 1, "storm.zookeeper.retry.times" 20, "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 75, "nimbus.file.copy.expiration.secs" 600, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8081, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "distributed", "topology.optimize" true, "topology.max.task.parallelism" nil} | |
2013-11-11 05:41:20 . c.t.l.ScribeHandler [WARN] Scribe server is archaic; changing to old protocol for future requests. | |
2013-11-11 05:41:20 . c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting | |
2013-11-11 05:41:20 . c.t.l.ScribeHandler [INFO] sent records: 0, per second: 0, dropped records: 0, reconnection failures: 0, reconnection skipped: 0 | |
2013-11-11 05:41:20 . b.s.zookeeper [INFO] Zookeeper state update: :connected:none | |
2013-11-11 05:41:20 . c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting | |
2013-11-11 05:41:21 . b.s.d.worker [ERROR] Error on initialization of server mk-worker | |
com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 | |
Serialization trace: | |
val$r (com.twitter.chill.KryoInstantiator$4) | |
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.twitter.chill.config.ConfiguredInstantiator.deserialize(ConfiguredInstantiator.java:141) ~[stormjar.jar:na] | |
at com.twitter.chill.config.ConfiguredInstantiator.<init>(ConfiguredInstantiator.java:66) ~[stormjar.jar:na] | |
at com.twitter.chill.storm.BlizzardKryoFactory.getKryo(BlizzardKryoFactory.java:40) ~[stormjar.jar:na] | |
at backtype.storm.serialization.SerializationFactory.getKryo(SerializationFactory.java:33) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.serialization.KryoValuesDeserializer.<init>(KryoValuesDeserializer.java:15) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.serialization.KryoTupleDeserializer.<init>(KryoTupleDeserializer.java:22) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.executor$mk_executor_data$fn__3420.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.util$assoc_apply_self.invoke(util.clj:734) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.executor$mk_executor_data.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:304) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831$iter__4836__4840$fn__4841.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.RT.seq(RT.java:473) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$seq.invoke(core.clj:133) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$dorun.invoke(core.clj:2725) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$doall.invoke(core.clj:2741) ~[org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.AFn.applyToHelper(AFn.java:185) [org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$apply.invoke(core.clj:601) ~[org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker$fn__4830$mk_worker__4886.doInvoke(worker.clj:318) [storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.RestFn.invoke(RestFn.java:512) [org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker$_main.invoke(worker.clj:428) [storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.AFn.applyToHelper(AFn.java:172) [org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker.main(Unknown Source) [storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 | |
at java.util.ArrayList.rangeCheck(ArrayList.java:635) ~[na:1.7.0-u40-rel_20] | |
at java.util.ArrayList.get(ArrayList.java:411) ~[na:1.7.0-u40-rel_20] | |
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:773) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:646) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
... 29 common frames omitted | |
2013-11-11 05:41:21 . b.s.util [INFO] Halting process: ("Error on initialization") | |
2013-11-11 05:42:43 . b.s.d.worker [INFO] Launching worker for summingbird_QueryCountJob_pradhuman-75-1384144487 on smf1-atg-05-sr3.prod.twitter.com:31001 with id 1a3b7700-4368-4044-a82b-d77c785156e4 and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 4, "zmq.linger.millis" 5000, "topology.skip.missing.kryo.registrations" false, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 40000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 500, "drpc.servers" ["smf1-atx-22-sr4.prod.twitter.com"], "nimbus.monitor.freq.secs" 10, "java.library.path" "native:/usr/local/lib:/usr/lib64", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "../storm-local", "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "smf1-atx-22-sr4.prod.twitter.com", "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm-dev", "supervisor.enable" true, "storm.zookeeper.servers" ["newstormzookeeper.local.twitter.com"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" 1, "mesos.framework.name" "storm-dev-smf1", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m -Xloggc:./logs/gc-%ID%.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=15 -XX:GCLogFileSize=128M", "supervisor.heartbeat.frequency.secs" 15, "topology.error.throttle.interval.secs" 30, "mesos.master.failover.timeout.secs" 3600, "cluster.type" "dev", "zmq.hwm" 10000, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 15, "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 15, "topology.tasks" nil, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.metrics.consumer.register" [{"class" "com.twitter.storm.metric.WriteMetricsToCuckoo", "parallelism.hint" 1}], "topology.max.spout.pending" nil, "mesos.master.url" "zk://mesos:mesos@szookeeper.local.twitter.com:2181/home/mesos/multi/master", "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "mesos.executor.uri.default" "hftp://hadoop-rt-nn.smf1.twitter.com:50070/user/storm/mesos_releases/dev-smf1-1/mesos-storm-0.9.0-wip15-2.2.0.0.tgz", "nimbus.topology.validator" "storm.mesos.DisallowNonIsolatedTopologies", "supervisor.slots.ports" [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 75, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "mesos.allowed.hosts" ["smf1-ayp-03-sr4.prod.twitter.com" "smf1-ayr-37-sr1.prod.twitter.com" "smf1-ays-34-sr1.prod.twitter.com" "smf1-ayt-08-sr4.prod.twitter.com" "smf1-ayu-03-sr1.prod.twitter.com" "smf1-ayv-37-sr4.prod.twitter.com" "smf1-ayw-26-sr4.prod.twitter.com" "smf1-asz-10-sr3.prod.twitter.com" "smf1-asz-28-sr2.prod.twitter.com" "smf1-atd-16-sr3.prod.twitter.com" "smf1-atb-10-sr3.prod.twitter.com" "smf1-asx-10-sr2.prod.twitter.com" "smf1-asx-01-sr2.prod.twitter.com" "smf1-atd-38-sr2.prod.twitter.com" "smf1-asx-10-sr3.prod.twitter.com" "smf1-atg-05-sr3.prod.twitter.com" "smf1-atd-28-sr2.prod.twitter.com" "smf1-atg-22-sr2.prod.twitter.com" "smf1-atg-22-sr3.prod.twitter.com" "smf1-asz-13-sr3.prod.twitter.com" "smf1-bjj-03-sr1.prod.twitter.com" "smf1-bjj-11-sr2.prod.twitter.com" "smf1-bjj-03-sr2.prod.twitter.com" "smf1-bhk-14-sr4.prod.twitter.com" "smf1-bma-19-sr4.prod.twitter.com" "smf1-bji-34-sr4.prod.twitter.com" "smf1-bjl-26-sr4.prod.twitter.com" "smf1-bjj-29-sr4.prod.twitter.com" "smf1-bmc-08-sr4.prod.twitter.com" "smf1-bjh-19-sr4.prod.twitter.com" "smf1-bjk-08-sr4.prod.twitter.com"], "storm.scheduler" "backtype.storm.scheduler.IsolationScheduler", "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 3, "topology.acker.tasks" nil, "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 75, "storm.zookeeper.connection.timeout" 40000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "zmq.threads" 1, "storm.zookeeper.retry.times" 20, "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 75, "nimbus.file.copy.expiration.secs" 600, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8081, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "distributed", "topology.optimize" true, "topology.max.task.parallelism" nil} | |
2013-11-11 05:42:44 . c.t.l.ScribeHandler [WARN] Scribe server is archaic; changing to old protocol for future requests. | |
2013-11-11 05:42:44 . c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting | |
2013-11-11 05:42:44 . c.t.l.ScribeHandler [INFO] sent records: 0, per second: 0, dropped records: 0, reconnection failures: 0, reconnection skipped: 0 | |
2013-11-11 05:42:44 . b.s.zookeeper [INFO] Zookeeper state update: :connected:none | |
2013-11-11 05:42:44 . c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting | |
2013-11-11 05:42:46 . b.s.d.worker [ERROR] Error on initialization of server mk-worker | |
com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 | |
Serialization trace: | |
val$r (com.twitter.chill.KryoInstantiator$4) | |
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.twitter.chill.config.ConfiguredInstantiator.deserialize(ConfiguredInstantiator.java:141) ~[stormjar.jar:na] | |
at com.twitter.chill.config.ConfiguredInstantiator.<init>(ConfiguredInstantiator.java:66) ~[stormjar.jar:na] | |
at com.twitter.chill.storm.BlizzardKryoFactory.getKryo(BlizzardKryoFactory.java:40) ~[stormjar.jar:na] | |
at backtype.storm.serialization.SerializationFactory.getKryo(SerializationFactory.java:33) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.serialization.KryoValuesDeserializer.<init>(KryoValuesDeserializer.java:15) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.serialization.KryoTupleDeserializer.<init>(KryoTupleDeserializer.java:22) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.executor$mk_executor_data$fn__3420.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.util$assoc_apply_self.invoke(util.clj:734) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.executor$mk_executor_data.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:304) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831$iter__4836__4840$fn__4841.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.RT.seq(RT.java:473) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$seq.invoke(core.clj:133) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$dorun.invoke(core.clj:2725) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$doall.invoke(core.clj:2741) ~[org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.AFn.applyToHelper(AFn.java:185) [org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$apply.invoke(core.clj:601) ~[org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker$fn__4830$mk_worker__4886.doInvoke(worker.clj:318) [storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.RestFn.invoke(RestFn.java:512) [org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker$_main.invoke(worker.clj:428) [storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.AFn.applyToHelper(AFn.java:172) [org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker.main(Unknown Source) [storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 | |
at java.util.ArrayList.rangeCheck(ArrayList.java:635) ~[na:1.7.0-u40-rel_20] | |
at java.util.ArrayList.get(ArrayList.java:411) ~[na:1.7.0-u40-rel_20] | |
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:773) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:646) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
... 29 common frames omitted | |
2013-11-11 05:42:46 . b.s.util [INFO] Halting process: ("Error on initialization") | |
2013-11-11 05:44:09 . b.s.d.worker [INFO] Launching worker for summingbird_QueryCountJob_pradhuman-75-1384144487 on smf1-atg-05-sr3.prod.twitter.com:31001 with id 270eeb85-dc73-42f0-ac5f-b3438bfab530 and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 4, "zmq.linger.millis" 5000, "topology.skip.missing.kryo.registrations" false, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 40000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 500, "drpc.servers" ["smf1-atx-22-sr4.prod.twitter.com"], "nimbus.monitor.freq.secs" 10, "java.library.path" "native:/usr/local/lib:/usr/lib64", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "../storm-local", "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "smf1-atx-22-sr4.prod.twitter.com", "storm.zookeeper.port" 2181, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm-dev", "supervisor.enable" true, "storm.zookeeper.servers" ["newstormzookeeper.local.twitter.com"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" 1, "mesos.framework.name" "storm-dev-smf1", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m -Xloggc:./logs/gc-%ID%.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=15 -XX:GCLogFileSize=128M", "supervisor.heartbeat.frequency.secs" 15, "topology.error.throttle.interval.secs" 30, "mesos.master.failover.timeout.secs" 3600, "cluster.type" "dev", "zmq.hwm" 10000, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 15, "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 15, "topology.tasks" nil, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "topology.metrics.consumer.register" [{"class" "com.twitter.storm.metric.WriteMetricsToCuckoo", "parallelism.hint" 1}], "topology.max.spout.pending" nil, "mesos.master.url" "zk://mesos:mesos@szookeeper.local.twitter.com:2181/home/mesos/multi/master", "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "mesos.executor.uri.default" "hftp://hadoop-rt-nn.smf1.twitter.com:50070/user/storm/mesos_releases/dev-smf1-1/mesos-storm-0.9.0-wip15-2.2.0.0.tgz", "nimbus.topology.validator" "storm.mesos.DisallowNonIsolatedTopologies", "supervisor.slots.ports" [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 75, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "mesos.allowed.hosts" ["smf1-ayp-03-sr4.prod.twitter.com" "smf1-ayr-37-sr1.prod.twitter.com" "smf1-ays-34-sr1.prod.twitter.com" "smf1-ayt-08-sr4.prod.twitter.com" "smf1-ayu-03-sr1.prod.twitter.com" "smf1-ayv-37-sr4.prod.twitter.com" "smf1-ayw-26-sr4.prod.twitter.com" "smf1-asz-10-sr3.prod.twitter.com" "smf1-asz-28-sr2.prod.twitter.com" "smf1-atd-16-sr3.prod.twitter.com" "smf1-atb-10-sr3.prod.twitter.com" "smf1-asx-10-sr2.prod.twitter.com" "smf1-asx-01-sr2.prod.twitter.com" "smf1-atd-38-sr2.prod.twitter.com" "smf1-asx-10-sr3.prod.twitter.com" "smf1-atg-05-sr3.prod.twitter.com" "smf1-atd-28-sr2.prod.twitter.com" "smf1-atg-22-sr2.prod.twitter.com" "smf1-atg-22-sr3.prod.twitter.com" "smf1-asz-13-sr3.prod.twitter.com" "smf1-bjj-03-sr1.prod.twitter.com" "smf1-bjj-11-sr2.prod.twitter.com" "smf1-bjj-03-sr2.prod.twitter.com" "smf1-bhk-14-sr4.prod.twitter.com" "smf1-bma-19-sr4.prod.twitter.com" "smf1-bji-34-sr4.prod.twitter.com" "smf1-bjl-26-sr4.prod.twitter.com" "smf1-bjj-29-sr4.prod.twitter.com" "smf1-bmc-08-sr4.prod.twitter.com" "smf1-bjh-19-sr4.prod.twitter.com" "smf1-bjk-08-sr4.prod.twitter.com"], "storm.scheduler" "backtype.storm.scheduler.IsolationScheduler", "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 3, "topology.acker.tasks" nil, "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "nimbus.task.timeout.secs" 75, "storm.zookeeper.connection.timeout" 40000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "zmq.threads" 1, "storm.zookeeper.retry.times" 20, "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 75, "nimbus.file.copy.expiration.secs" 600, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8081, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "distributed", "topology.optimize" true, "topology.max.task.parallelism" nil} | |
2013-11-11 05:44:10 . c.t.l.ScribeHandler [WARN] Scribe server is archaic; changing to old protocol for future requests. | |
2013-11-11 05:44:10 . c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting | |
2013-11-11 05:44:10 . c.t.l.ScribeHandler [INFO] sent records: 0, per second: 0, dropped records: 0, reconnection failures: 0, reconnection skipped: 0 | |
2013-11-11 05:44:10 . b.s.zookeeper [INFO] Zookeeper state update: :connected:none | |
2013-11-11 05:44:10 . c.n.c.f.i.CuratorFrameworkImpl [INFO] Starting | |
2013-11-11 05:44:11 . b.s.d.worker [ERROR] Error on initialization of server mk-worker | |
com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 | |
Serialization trace: | |
val$r (com.twitter.chill.KryoInstantiator$4) | |
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.twitter.chill.config.ConfiguredInstantiator.deserialize(ConfiguredInstantiator.java:141) ~[stormjar.jar:na] | |
at com.twitter.chill.config.ConfiguredInstantiator.<init>(ConfiguredInstantiator.java:66) ~[stormjar.jar:na] | |
at com.twitter.chill.storm.BlizzardKryoFactory.getKryo(BlizzardKryoFactory.java:40) ~[stormjar.jar:na] | |
at backtype.storm.serialization.SerializationFactory.getKryo(SerializationFactory.java:33) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.serialization.KryoValuesDeserializer.<init>(KryoValuesDeserializer.java:15) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.serialization.KryoTupleDeserializer.<init>(KryoTupleDeserializer.java:22) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.executor$mk_executor_data$fn__3420.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.util$assoc_apply_self.invoke(util.clj:734) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.executor$mk_executor_data.invoke(executor.clj:200) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.executor$mk_executor.invoke(executor.clj:304) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831$iter__4836__4840$fn__4841.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.LazySeq.sval(LazySeq.java:42) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.LazySeq.seq(LazySeq.java:60) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.RT.seq(RT.java:473) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$seq.invoke(core.clj:133) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$dorun.invoke(core.clj:2725) ~[org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$doall.invoke(core.clj:2741) ~[org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker$fn__4830$exec_fn__1194__auto____4831.invoke(worker.clj:349) ~[storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.AFn.applyToHelper(AFn.java:185) [org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na] | |
at clojure.core$apply.invoke(core.clj:601) ~[org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker$fn__4830$mk_worker__4886.doInvoke(worker.clj:318) [storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.RestFn.invoke(RestFn.java:512) [org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker$_main.invoke(worker.clj:428) [storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
at clojure.lang.AFn.applyToHelper(AFn.java:172) [org.clojure-clojure-1.4.0.jar:na] | |
at clojure.lang.AFn.applyTo(AFn.java:151) [org.clojure-clojure-1.4.0.jar:na] | |
at backtype.storm.daemon.worker.main(Unknown Source) [storm-storm-0.9.0-wip15-2.2.0.jar:na] | |
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1 | |
at java.util.ArrayList.rangeCheck(ArrayList.java:635) ~[na:1.7.0-u40-rel_20] | |
at java.util.ArrayList.get(ArrayList.java:411) ~[na:1.7.0-u40-rel_20] | |
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:773) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:646) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) ~[com.esotericsoftware.kryo-kryo-2.21.jar:na] | |
... 29 common frames omitted | |
2013-11-11 05:44:11 . b.s.util [INFO] Halting process: ("Error on initialization") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment