Skip to content

Instantly share code, notes, and snippets.

@sscdotopen
Created March 7, 2024 12:14
Show Gist options
  • Save sscdotopen/a0aabb3ef6a42b4261fa3feb40bc87e3 to your computer and use it in GitHub Desktop.
Save sscdotopen/a0aabb3ef6a42b4261fa3feb40bc87e3 to your computer and use it in GitHub Desktop.
Implementation of the incremental view maintenance example from our lecture in Differential Dataflow
extern crate timely;
extern crate differential_dataflow;
use timely::dataflow::operators::probe::Handle;
use differential_dataflow::input::Input;
use differential_dataflow::operators::*;
fn main() {
let _ = timely::execute_from_args(std::env::args(), move |worker| {
let mut probe = Handle::new();
let (mut customers_input, mut orders_input) = worker.dataflow(|scope| {
let (customers_input, customers) = scope.new_collection();
let (orders_input, orders) = scope.new_collection();
/*
CREATE VIEW HighPriceOrdersPerCustomer AS
SELECT Customers.Name, COUNT(*) AS NumOrders
FROM Customers
JOIN Orders ON Customers.Name = Orders.Name
WHERE Orders.Price > 250
GROUP BY Customers.Name
*/
let high_priced_orders_per_customer =
orders
.filter(|(_name, (_category, price))| *price > 250)
.join_map(&customers, |name: &String, _, _| (*name).to_string())
.inspect(|(record, time, change)| {
eprintln!(
"\t Customer: {:?}, time: {:?}, change in order count: {:?}",
record,
time,
change
)
});
high_priced_orders_per_customer.probe_with(&mut probe);
(customers_input, orders_input)
});
let initial_customers = [
("Bob".to_string(), ("99 High St.".to_string(), 415000)),
("Aliya".to_string(), ("125 Baker St.".to_string(), 415202)),
("Ji".to_string(), ("76 Square St.".to_string(), 415123)),
];
let initial_orders = [
("Bob".to_string(), ("Clothing".to_string(), 1200)),
("Bob".to_string(), ("Clothing".to_string(), 500)),
("Aliya".to_string(), ("Furniture".to_string(), 300)),
];
customers_input.advance_to(0);
orders_input.advance_to(0);
for customer in initial_customers {
customers_input.insert(customer);
}
for order in initial_orders {
orders_input.insert(order);
}
customers_input.close();
orders_input.advance_to(1);
orders_input.flush();
println!("\n\t -- time 0 -> 1 --------------------");
worker.step_while(|| probe.less_than(orders_input.time()));
let canceled_orders = [
("Bob".to_string(), ("Clothing".to_string(), 500)),
];
let new_orders = [
("Bob".to_string(), ("Clothing".to_string(), 100)),
("Ji".to_string(), ("Furniture".to_string(), 1000)),
("Aliya".to_string(), ("Clothing".to_string(), 50)),
];
for order in canceled_orders {
orders_input.remove(order);
}
for order in new_orders {
orders_input.insert(order);
}
orders_input.advance_to(2);
orders_input.flush();
println!("\n\t -- time 1 -> 2 --------------------");
worker.step_while(|| probe.less_than(orders_input.time()));
});
}
@sscdotopen
Copy link
Author

Running this program produces the following output:

	 -- time 0 -> 1 --------------------
	 Customer: "Aliya", time: 0, change in order count: 1
	 Customer: "Bob", time: 0, change in order count: 2

	 -- time 1 -> 2 --------------------
	 Customer: "Bob", time: 1, change in order count: -1
	 Customer: "Ji", time: 1, change in order count: 1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment