Skip to content

Instantly share code, notes, and snippets.

@tonycox
Created August 1, 2017 15:59
Show Gist options
  • Save tonycox/d8997dd586efb993d7a06eb2a28fd76d to your computer and use it in GitHub Desktop.
Save tonycox/d8997dd586efb993d7a06eb2a28fd76d to your computer and use it in GitHub Desktop.
Ignite locking cache
import org.apache.ignite.IgniteSpringBean;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_PROTOTYPE;
/**
* @author Anton Solovev
* @since 8/1/2017.
*/
@Configuration
public class IgniteCfg {
@Bean
@Scope(SCOPE_PROTOTYPE)
public String gridName() {
return UUID.randomUUID().toString();
}
@Bean
public String localHost() {
return "127.0.0.1";
}
@Bean
@Scope(SCOPE_PROTOTYPE)
public DiscoverySpi discoverySpi() {
TcpDiscoverySpi spi = new TcpDiscoverySpi();
spi.setIpFinder(ipFinder());
return spi;
}
@Bean
@Scope(SCOPE_PROTOTYPE)
public TcpDiscoveryIpFinder ipFinder() {
TcpDiscoveryVmIpFinder tdVmIp = new TcpDiscoveryVmIpFinder();
tdVmIp.setAddresses(discoverIps());
return tdVmIp;
}
@Bean
public List<String> discoverIps() {
return Collections.singletonList(localHost() + ":47500..47509");
}
@Bean
@Scope(SCOPE_PROTOTYPE)
public IgniteConfiguration igniteConfiguration() {
return new IgniteConfiguration() {{
setCacheConfiguration();
setLocalHost(localHost());
setGridName(gridName());
setDiscoverySpi(discoverySpi());
}};
}
@Bean
@Scope(SCOPE_PROTOTYPE)
public IgniteSpringBean igniteBean() {
IgniteSpringBean ignite = new IgniteSpringBean();
ignite.setConfiguration(igniteConfiguration());
return ignite;
}
}
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteSpringBean;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.stream.IntStream;
import static java.util.stream.Collectors.toList;
/**
* @author Anton Solovev
* @since 8/1/2017.
*/
@Slf4j
public class Test {
public static void main(String[] args) {
ApplicationContext appContext = new AnnotationConfigApplicationContext(IgniteCfg.class);
Ignite ignite = appContext.getBean(IgniteSpringBean.class);
int clusterSize = 3;
List<Process> processes = new ArrayList<>(clusterSize);
startServerNodes(processes, clusterSize);
waitStartServerNodes(ignite, clusterSize);
CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>("someCache");
cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
IgniteCache<Integer, Integer> cache = ignite.createCache(cacheCfg);
List<Integer> list = IntStream
.range(0, 1000)
.boxed()
.collect(toList());
list.forEach(i -> cache.put(i, i));
List<Integer> collect = IntStream.range(0, 1000)
.parallel()
.boxed()
.map(index -> new TupleLock(cache.lock(index), index))
.peek(tupleLock -> tupleLock.getLock().lock())
.map(tupleLock -> {
Integer val = ignite.compute().affinityCall("someCache", tupleLock.getIndex(), new IgniteCallable<Integer>() {
@IgniteInstanceResource
transient Ignite ignite;
@Override
public Integer call() throws Exception {
Integer result = ignite
.<Integer, Integer>cache("someCache")
.localPeek(tupleLock.getIndex());
return result * 2;
}
});
return new Tuple(tupleLock.getLock(), val);
})
.peek(tupleLock -> tupleLock.getLock().unlock())
.map(Tuple::getValue)
.sorted()
.collect(toList());
collect.forEach(System.out::println);
ignite.compute(ignite.cluster().forServers().forRemotes()).withAsync()
.broadcast(() -> System.exit(1));
ignite.close();
}
public static class IgniteStarter {
public static void main(String[] args) throws Exception {
ApplicationContext appContext = new AnnotationConfigApplicationContext(IgniteCfg.class);
appContext.getBean(IgniteSpringBean.class);
}
}
public static void waitStartServerNodes(Ignite clientNode, int clusterSize) {
IgniteCluster cluster = clientNode.cluster();
do {
try {
long AWAIT_TIME = 2_000;
Thread.sleep(AWAIT_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (cluster.forServers().nodes().size() <= clusterSize);
}
public static void startServerNodes(List<Process> processes, int count) {
try {
for (int gridNumber = 0; gridNumber < count && processes.size() < count; gridNumber++) {
processes.add(startJVM("node-" + (processes.size() + gridNumber), IgniteStarter.class));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static Process startJVM(String gridName, Class classForRun) throws IOException, InterruptedException {
List<String> params = new ArrayList<>();
params.add("java");
params.add("-cp");
params.add(System.getProperty("java.class.path"));
params.add(classForRun.getName());
ProcessBuilder builderExecute = new ProcessBuilder(params);
Process process = builderExecute.start();
InputStream stderr = process.getErrorStream();
InputStream stdout = process.getInputStream();
printOutputProcess(gridName, stdout, stderr);
return process;
}
private static void printOutputProcess(String processName, InputStream... inputStreams) {
for (InputStream stream : inputStreams) {
Thread logThread = new Thread(() -> {
String out;
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(stream))) {
while ((out = bufferedReader.readLine()) != null) {
log.info("[ {} ] {}", processName, out);
}
} catch (IOException e) {
log.error("Error output: ", e);
}
});
logThread.setDaemon(true);
logThread.start();
}
}
@Value
private static class Tuple {
private final Lock lock;
private final Integer value;
}
@Value
private static class TupleLock {
private final Lock lock;
private final Integer index;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment