Created
August 1, 2017 15:59
-
-
Save tonycox/d8997dd586efb993d7a06eb2a28fd76d to your computer and use it in GitHub Desktop.
Ignite locking cache
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.ignite.IgniteSpringBean; | |
import org.apache.ignite.configuration.IgniteConfiguration; | |
import org.apache.ignite.spi.discovery.DiscoverySpi; | |
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; | |
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; | |
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.context.annotation.Scope; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.UUID; | |
import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_PROTOTYPE; | |
/** | |
* @author Anton Solovev | |
* @since 8/1/2017. | |
*/ | |
@Configuration | |
public class IgniteCfg { | |
@Bean | |
@Scope(SCOPE_PROTOTYPE) | |
public String gridName() { | |
return UUID.randomUUID().toString(); | |
} | |
@Bean | |
public String localHost() { | |
return "127.0.0.1"; | |
} | |
@Bean | |
@Scope(SCOPE_PROTOTYPE) | |
public DiscoverySpi discoverySpi() { | |
TcpDiscoverySpi spi = new TcpDiscoverySpi(); | |
spi.setIpFinder(ipFinder()); | |
return spi; | |
} | |
@Bean | |
@Scope(SCOPE_PROTOTYPE) | |
public TcpDiscoveryIpFinder ipFinder() { | |
TcpDiscoveryVmIpFinder tdVmIp = new TcpDiscoveryVmIpFinder(); | |
tdVmIp.setAddresses(discoverIps()); | |
return tdVmIp; | |
} | |
@Bean | |
public List<String> discoverIps() { | |
return Collections.singletonList(localHost() + ":47500..47509"); | |
} | |
@Bean | |
@Scope(SCOPE_PROTOTYPE) | |
public IgniteConfiguration igniteConfiguration() { | |
return new IgniteConfiguration() {{ | |
setCacheConfiguration(); | |
setLocalHost(localHost()); | |
setGridName(gridName()); | |
setDiscoverySpi(discoverySpi()); | |
}}; | |
} | |
@Bean | |
@Scope(SCOPE_PROTOTYPE) | |
public IgniteSpringBean igniteBean() { | |
IgniteSpringBean ignite = new IgniteSpringBean(); | |
ignite.setConfiguration(igniteConfiguration()); | |
return ignite; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lombok.Value; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.ignite.Ignite; | |
import org.apache.ignite.IgniteCache; | |
import org.apache.ignite.IgniteCluster; | |
import org.apache.ignite.IgniteSpringBean; | |
import org.apache.ignite.cache.CacheAtomicityMode; | |
import org.apache.ignite.configuration.CacheConfiguration; | |
import org.apache.ignite.lang.IgniteCallable; | |
import org.apache.ignite.resources.IgniteInstanceResource; | |
import org.springframework.context.ApplicationContext; | |
import org.springframework.context.annotation.AnnotationConfigApplicationContext; | |
import java.io.BufferedReader; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.InputStreamReader; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.locks.Lock; | |
import java.util.stream.IntStream; | |
import static java.util.stream.Collectors.toList; | |
/** | |
* @author Anton Solovev | |
* @since 8/1/2017. | |
*/ | |
@Slf4j | |
public class Test { | |
public static void main(String[] args) { | |
ApplicationContext appContext = new AnnotationConfigApplicationContext(IgniteCfg.class); | |
Ignite ignite = appContext.getBean(IgniteSpringBean.class); | |
int clusterSize = 3; | |
List<Process> processes = new ArrayList<>(clusterSize); | |
startServerNodes(processes, clusterSize); | |
waitStartServerNodes(ignite, clusterSize); | |
CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>("someCache"); | |
cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); | |
IgniteCache<Integer, Integer> cache = ignite.createCache(cacheCfg); | |
List<Integer> list = IntStream | |
.range(0, 1000) | |
.boxed() | |
.collect(toList()); | |
list.forEach(i -> cache.put(i, i)); | |
List<Integer> collect = IntStream.range(0, 1000) | |
.parallel() | |
.boxed() | |
.map(index -> new TupleLock(cache.lock(index), index)) | |
.peek(tupleLock -> tupleLock.getLock().lock()) | |
.map(tupleLock -> { | |
Integer val = ignite.compute().affinityCall("someCache", tupleLock.getIndex(), new IgniteCallable<Integer>() { | |
@IgniteInstanceResource | |
transient Ignite ignite; | |
@Override | |
public Integer call() throws Exception { | |
Integer result = ignite | |
.<Integer, Integer>cache("someCache") | |
.localPeek(tupleLock.getIndex()); | |
return result * 2; | |
} | |
}); | |
return new Tuple(tupleLock.getLock(), val); | |
}) | |
.peek(tupleLock -> tupleLock.getLock().unlock()) | |
.map(Tuple::getValue) | |
.sorted() | |
.collect(toList()); | |
collect.forEach(System.out::println); | |
ignite.compute(ignite.cluster().forServers().forRemotes()).withAsync() | |
.broadcast(() -> System.exit(1)); | |
ignite.close(); | |
} | |
public static class IgniteStarter { | |
public static void main(String[] args) throws Exception { | |
ApplicationContext appContext = new AnnotationConfigApplicationContext(IgniteCfg.class); | |
appContext.getBean(IgniteSpringBean.class); | |
} | |
} | |
public static void waitStartServerNodes(Ignite clientNode, int clusterSize) { | |
IgniteCluster cluster = clientNode.cluster(); | |
do { | |
try { | |
long AWAIT_TIME = 2_000; | |
Thread.sleep(AWAIT_TIME); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} while (cluster.forServers().nodes().size() <= clusterSize); | |
} | |
public static void startServerNodes(List<Process> processes, int count) { | |
try { | |
for (int gridNumber = 0; gridNumber < count && processes.size() < count; gridNumber++) { | |
processes.add(startJVM("node-" + (processes.size() + gridNumber), IgniteStarter.class)); | |
} | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
private static Process startJVM(String gridName, Class classForRun) throws IOException, InterruptedException { | |
List<String> params = new ArrayList<>(); | |
params.add("java"); | |
params.add("-cp"); | |
params.add(System.getProperty("java.class.path")); | |
params.add(classForRun.getName()); | |
ProcessBuilder builderExecute = new ProcessBuilder(params); | |
Process process = builderExecute.start(); | |
InputStream stderr = process.getErrorStream(); | |
InputStream stdout = process.getInputStream(); | |
printOutputProcess(gridName, stdout, stderr); | |
return process; | |
} | |
private static void printOutputProcess(String processName, InputStream... inputStreams) { | |
for (InputStream stream : inputStreams) { | |
Thread logThread = new Thread(() -> { | |
String out; | |
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(stream))) { | |
while ((out = bufferedReader.readLine()) != null) { | |
log.info("[ {} ] {}", processName, out); | |
} | |
} catch (IOException e) { | |
log.error("Error output: ", e); | |
} | |
}); | |
logThread.setDaemon(true); | |
logThread.start(); | |
} | |
} | |
@Value | |
private static class Tuple { | |
private final Lock lock; | |
private final Integer value; | |
} | |
@Value | |
private static class TupleLock { | |
private final Lock lock; | |
private final Integer index; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment