I'm still very new to Kafka, eventsourcing, stream processing, etc. I'm in the middle of building my first production system with this stuff and am writing this at the request of a few folks on Twitter. So if you do have experience, please do me and anyone else reading this a favor by pointing out things I get wrong :)
- The Log — http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
- Turning the database inside out — http://www.confluent.io/blog/2015/03/04/turning-the-database-inside-out-with-apache-samza/
- Why local state is a fundamental primitive in stream processing — http://radar.oreilly.com/2014/07/why-local-state-is-a-fundamental-primitive-in-stream-processing.html
- Samza
- Various functional systems been exposed to over past year or so, React, Flux, RX, etc.
I'm the technical co-founder at RelateRocket. We're a fairly early stage startup based in SF building an algorithmic social proof product. We hook into marketing automation and CRM tools to auto-produce custom personalized pages. And we're planning on doing this on a massive scale, e.g. a company would create and email out 100,000 pages with custom relatable content.
So the systems I'm building need to a) handle this sort of scale and b) slice and dice the data dozens of ways for internal anaytics, customer-facing analytics, alerting, and for pushing data back to various other tools that our customers are using.
So while studying this problem, I discovered event sourcing. I'd read The Log blog post a few years ago when it came out and was very attracted to the idea but didn't really connect the dots about how to turn that into a working system until discovering event sourcing.
Event sourcing has two very attractive properties. A) You don't throw data away. You just record everything you can think of and then figure out how to make use of it. This is very attractive to a analytics-heavy product + in a startup when you're really not sure upfront what data is useful and what's not. B) Seperating reads and writes. In school and in the few backend systems I've written (I've mostly done frontend work), I've never enjoyed designing database schemas. They always felt hacky and ungainly. I can now see that most of the ugliness came from the unnatural coupling of the read and write schemas. Writing events feels very natural. You just declare what happened. A user was updated. By this person. And this is what was changed. And that's the end of it. You don't have to awkwardly mutate a user object and perhaps if you're feeling ambitious, write to an audit log table who made the change. And for reads, as I'll get to in a bit, you have unlimited freedom to mold the raw event data into whatever form makes sense for your application. Which is very easy and rather fun actually.
Somewhere around when I discovered event sourcing, I also discovered Kafka. Which I won't write much about as there's tons of info on the internet but Kafka is a beautiful piece of software. Highly performant, durable, replayable pub-sub. The perfect tool for so many data tasks.
So event sourcing is super cool but how to do you turn your low-level raw events into usable, queryable objects? Stream processing is the normal answer (there's also batch processing with say Hadoop but that's so 3 years ago).
Basically as new events flow through your system, you "process" them into some sort of higher-level form. E.g. a userCreate
event is the start of a new user. A userUpdate
event flows by and that's grabbed to update an existing user. A userLoggedIn
event happens and we increment the times_logged_in
field on the user.
For React.js peeps reading out there, this should sound exactly like the Flux architecture.
There's a variety of stream processing tools out there e.g. Spark and Samza. I've choosen (for now anyways) to forgo using those and instead, do stream processing with node.js. Those tools both sound great and we'll probably use them someday but they don't seem necessary given that we're still small-data not big-data and as the sole developer, I really need to limit the number of tools I'm using to keep the complexity of the product within bounds. So as I'm already using Node.js extensively, it seems appropriate to keep on using it.
Samza, at its heart, is actually very simple. You expose a function (ignoring that Java makes you wrap functions in ugly classes) that's subscribed to a Kafka topic that Samza calls whenever there's a new message. You do something to the message and then generally re-emit the processed message onto a new topic.
So to my earlier example, a userCreated
event comes in and you process that into the user schema and then publish that new object to the user
topic. Another system that's responsible for responding to user information queries would then listen to that topic and use changes there to update its store.
This is how it'd look in node.js.
var HighLevelProducer, KeyedMessage, Immutable, List, Map, fromJS, client, emit, fromJS, kafka, producer, users;
{fromJS, List, Map} = require('immutable');
_ = require('underscore');
// Setup our Kafka consumer.
{HighLevelConsumer, KeyedMessage} = kafka = require('kafka-node');
client = new kafka.Client();
consumer = new HighLevelConsumer(client);
// Setup our Kafka producer.
{HighLevelProducer, KeyedMessage} = kafka = require('kafka-node');
client = new kafka.Client();
producer = new HighLevelProducer(client);
// Create user topic.
producer.on('ready', function() {
return producer.createTopics(['user'], false, function(err, data) {
if (err) {
return console.log(err);
}
});
});
users = Map();
emit = function(user) {
var message;
if (user) {
message = new KeyedMessage(user.get('id'), JSON.stringify(user));
return producer.send([
{
topic: 'user',
messages: message
}
], function(err, data) {
if (err) {
return console.log(err);
}
});
}
};
// Listen to new events.
consumer = new Consumer(client, [
{
topic: 'events'
}
], {
groupId: 'user-aggregator'
});
consumer.on('message', function(event) {
var e, failedLogins, logins;
switch (event.event_type) {
case "userCreated":
users = users.set(event.entity_id, fromJS({
id: event.entity_id,
name: event.event.name,
email: event.event.email,
roles: event.event.roles,
organization_id: event.event.organization_id,
created_at: event.timestamp,
updated_at: event.timestamp,
logins: [],
logins_failed: []
}));
return emit(users.get(event.entity_id));
case "userUpdated":
users = users.mergeDeepIn([event.entity_id], fromJS({
name: event.event.name,
email: event.event.email,
roles: event.event.roles,
updated_at: event.timestamp
}));
return emit(users.get(event.entity_id));
case "userLoggedIn":
e = _.extend(event.event, {
timestamp: event.timestamp
});
logins = users.getIn([event.actor_id, 'logins']).push(e);
users = users.setIn([event.actor_id, 'logins'], logins);
return emit(users.get(event.entity_id));
case "userFailedLogin":
e = _.extend(event.event, {
timestamp: event.timestamp
});
failedLogins = users.getIn([event.entity_id, 'logins_failed']).push(e);
users = users.setIn([event.entity_id, 'logins_failed'], failedLogins);
return emit(users.get(event.entity_id));
}
};
- aggregate objects from raw events
- real-time updates — it's easy to connect socket.io or some other websocket/push tech to the stream of new objects and push them to the dashboard. We have a acivity stream page working this way and will be adding more soon.
- campaigns — We let you create campaigns, arbitary groupings of pages you're sending out. I'll be writing soon a campaign aggregator that watches for new page analytics related to a campaign and group that together to drive campaign-specific analytic dashboards.
- integration with other tools. Each marketing/sales tool has there own way of viewing the world. To drive integration, I'll write a stream processor for each one that translates our events into updates understandable by that tool.
- Enrichments — we record the IP address for everyone that visits one of our custom pages. A natural thing to do is lookup geo information on that IP to "enrich" the event.
@KyleAMatthews Nice writeup! It's awesome you're writing about your work and sharing it out. :)
This is interesting stuff, and I think your points about getting up and running fast and really good framing. I had a few thoughts for you which might not be relevant for what you're trying to do, but I figured they're worth sharing and you can take them for what they're worth.
In my experience, there are a lot of tough problems around failure modes. Those might not be super important for what you're doing here, and
console.log(err)
might do the trick just fine. But by identifying this you can maybe more explicitly identify where you're taking on risk as you build it out, and can circle back to it if those seem like worthwhile improvements later.Another area like that is state in the
users
map that's in-memory. If the process crashes, recovering that state quickly might be important rather than recomputing it or losing it (which is one of the key properties of Samza, using Kafka to store snapshots of the local state). Also, another benefit of established stream-processing frameworks are levers for tuning performance, with things like how you increase parallelism as the volume of data grows. It's not clear to me how well JS/node would support that (say, if the single process couldn't keep up), and distributing computation across a cluster is one of the main values of frameworks like Samza or Storm.These points I'm bringing up might not apply to what you're building now, I just thought it would be helpful to share another perspective. All that being said, I think this is awesome writing and work, much respect for sharing and I'm excited to hear more. These kinds of analytics problems are fun stuff, and writing about these topics and making it easier for everyone to experiment with them is the best way for us all to make more awesome products. :)