hazelcast入门教程

这是从初学者的角度来看一系列有关如何使用Hazelcast的文章的延续。 如果您还没有阅读最后两个,我鼓励阅读它们:

  • Hazelcast入门指南第1部分
  • Hazelcast入门指南第2部分

原始人来了

在上一篇文章中,我提到将ILock与IList和ISet一起使用,因为它们不是线程安全的。 令我惊讶的是,我没有涵盖Hazelcast的基本部分,即分布式原语。 他们解决了以分布式方式同步资源使用的问题。 那些执行大量线程编程的人将立即识别它们。 对于那些不熟悉线程编程的人,我将解释每个原语的作用并举一个例子。

长寿

这是一个分布式原子长。 这意味着所有操作都一次发生。 例如,可以在一个操作中添加一个数字并检索结果值。 可以获取值,然后添加值。 对于在此原语上执行的每项操作都是如此。 可以想象,它是线程安全的,但不能做到这一点,而且是线程安全的。

atomicLong.addAndGet(2 * atomicLong.get());

上面的行创建了一个竞争条件,因为有三个操作,读取原子long的内容,乘以2并将其添加到实例中。 仅在保证一步操作的情况下,线程才安全地存在。 为此,IAtomicLong有一个名为alterAndGet的方法。 AlterAndGet带有IFunction对象。 这使多步操作成为一步。 IAtomicLong始终有一个同步备份,并且它是不可配置的。

IdGenerator

IAtomicLongs非常适合用来跟踪一个人有多少。 问题在于,由于该呼叫很可能是远程呼叫,因此在某些情况下,IAtomicLongs并不是理想的解决方案。 这些情况之一就是生成唯一的ID。 IdGenerator就是为此目的而制作的。 它的工作方式是每个成员要求生成一百万个ID。 一旦所有这些要求的数字都被使用,该部门将要求另外一百万。 因此,由于每个成员都有100万个ID,所以远程调用IdGenerator的机会是100万个。 这使得生成唯一ID的方法非常快捷。 如果发生任何重复,可能是因为成员没有加入。 如果成员在其段用尽之前发生故障,则ID中将存在间隙。 对于唯一的ID生成,缺少数字不是问题。 我确实认为成员没有挂接到集群是一个问题,但是如果发生这种情况,则还有更大的事情要担心。 如果群集重新启动,则ID将从零开始。 那是因为id不能持久存在。 这是一个内存数据库,一个机会。 为了解决这个问题,可以将IdGenerators设置为以特定数字开头,只要其他人没有声明它并且还没有生成ID。 替代方法是创建自己的ID生成器或使用java.util.UUID类。 这可能需要更多空间,但是每个项目都有其自己的要求可以满足。 IdGenerator始终具有一个同步备份,无法进行配置。

这是经典的同步方法。 它是分发的排他锁。 只需调用方法锁,线程便会等待或获得锁。 一旦建立了锁定,就可以执行关键部分。 工作完成后,将使用解锁方法。 这项技术的资深人士会将关键部分放在try finally块中,在try块外部建立锁定,并在finally部分建立解锁。 这对于在线程安全的结构上执行操作非常有用。 获取锁的进程拥有该锁,并且需要调用解锁才能使其他进程能够建立锁。 当一个人在网络上的多个位置都有线程时,这可能会出现问题。 Hazelcast想到了这个问题,并在成员退出时释放了锁定。 另一个功能是锁定方法的超时时间为300秒。 这样可以防止线程不足。 ILock具有一个同步备份,并且不可配置。

有经验的人的一些建议,使关键部分尽可能 ; 这有助于提高性能并防止死锁。 由于线程的执行顺序未知,因此死锁难以调试且难以测试。 漏洞一度表现出来,然后就没有。 由于锁放错了位置,因此可能会持续一周或更长时间。 然后必须确保它不会再次发生。 由于线程的执行未知,很难证明这一点。 等到一切完成时,老板会因为花费的时间而感到沮丧,并且不知道该错误是否已修复。

ICondition

是否曾经想等待事件发生,但又不想其他人也必须等待事件发生? 这正是线程编程中的条件。 在Java 1.5之前,这是通过synced-wait-notify技术完成的。 这可以通过锁定条件技术来执行。 和我一起旅行,我可以向大家展示这是如何工作的。 想象一下这样一种情况,即存在一个非线程安全列表,并且有生产者和使用者进行读写。 显然,有些关键部分需要保护。 那落入了锁。 建立锁定后,便可以开始关键工作。 唯一的问题是资源处于对线程无用的状态。 例如,消费者无法从空列表中提取条目。 生产者无法将条目放入完整列表。 这是条件进入的地方。生产者或消费者将进入while循环,以测试条件是否有利,然后调用condition.await()。 调用await之后,该线程将放弃其锁定,并让其他线程访问其关键部分。 等待中的线程将锁重新测试其条件,并可能等待更多时间或条件已满足并开始工作。 关键部分完成后,线程可以调用signal()或signalAll()来告诉其他线程唤醒并检查其状况。 条件是由而不是Hazelcast实例创建的。 另一件事是,如果要分发条件,则必须使用lock.newCondition(String name)方法。 IConditions具有一个同步备份,无法配置。

我无法说出使用这种技术会发生多少死锁。 有时,当线程正在等待并且一切正常时,就会发出信号。 另一方面是在线程等待时发送信号,进入等待状态并永远等待。 因此,我主张在等待时使用超时,以便线程可以每隔一段时间检查一次是否满足条件。 这样,如果信号丢失,则可能发生的最坏情况是等待时间短而不是永远等待。 我在示例中使用了超时技术。 复制并粘贴所需的代码。 我宁愿使用正在测试的技术,也不愿使用未经测试的代码入侵互联网。

ICountDownLatch

ICountDownLatch是一个同步工具,当其计数器变为零时触发。 这不是进行协调的常用方法,但是在需要时可用。 我认为示例部分提供了有关其工作原理的更好的解释。 锁存器归零后可以复位,因此可以再次使用。 如果拥有成员离开,则会发出所有等待闩锁到达零的线程的信号,就好像已达到零。 ICountDownLatch在另一个地方同步备份,无法配置。

等量线

是的,有经典信号量的分布式版本。 令我激动的是,因为我上次上操作系统课时,信号量需要一些硬件支持。 也许我只是和自己约会,哦,它仍然很酷(再次约会我自己)。 信号量通过限制可以访问资源的线程数来工作。 与锁不同,信号量没有所有权感,因此不同的线程可以释放对资源的声明。 与其余的原语不同,可以配置ISemaphore。 我在示例中配置一个。 它位于我项目的默认包中的hazelcast.xml中。

例子

这里是例子。 我对上一篇帖子发表了评论,要求我对代码进行缩进,以使其更具可读性。 由于我要发布的代码量很大,所以这次我将确保这样做。 将会看到我以前没有讨论过的几件事。 一种是IExecutorService。 这是ExecutorService的分布式版本。 一个人实际上可以发送工作,以由不同的成员完成。 另一件事是,所有定义的Runnable / Callable类都实现了Serializable。 这在分布式环境中是必需的,因为可以将对象发送给不同的成员。 最后一件事是HazelcastInstanceAware接口。 它允许类访问本地 Hazelcast实例。 然后,类可以获取所需资源的实例(例如ILists)。 事不宜迟,我们开始。

长寿

package hazelcastprimitives.iatomiclong;import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.core.IFunction;
import java.io.Serializable;/**** @author Daryl*/
public class IAtomicLongExample {public static class MultiplyByTwoAndSubtractOne implements IFunction, Serializable {@Overridepublic Long apply(Long t) {return (long)(2 * t - 1);}}public static final void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();final String NAME = "atomic";IAtomicLong aLong = instance.getAtomicLong(NAME);IAtomicLong bLong = instance.getAtomicLong(NAME);aLong.getAndSet(1L);System.out.println("bLong is now: " + bLong.getAndAdd(2));System.out.println("aLong is now: " + aLong.getAndAdd(0L));MultiplyByTwoAndSubtractOne alter = new MultiplyByTwoAndSubtractOne();aLong.alter(alter);System.out.println("bLong is now: " + bLong.getAndAdd(0L));bLong.alter(alter);System.out.println("aLong is now: " + aLong.getAndAdd(0L));System.exit(0);}
}

注意,即使MutilpyAndSubtractOne类也实现了Serializable。

IdGenerator

package hazelcastprimitives.idgenerator;import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IdGenerator;/**** @author Daryl*/
public class IdGeneratorExample {public static void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IdGenerator generator = instance.getIdGenerator("generator");for(int i = 0; i < 10; i++) {System.out.println("The generated value is " + generator.newId());}instance.shutdown();System.exit(0);}
}

此ILock示例也可以视为ICondition示例。 我必须使用一个条件,因为ListConsumer始终在ListProducer之前运行,所以我让ListConsumer等到IList有消耗的东西。

package hazelcastprimitives.ilock;import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.ICondition;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IList;
import com.hazelcast.core.ILock;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;/**** @author Daryl*/
public class ILockExample {static final String LIST_NAME = "to be locked";static final String LOCK_NAME = "to lock with";static final String CONDITION_NAME = "to signal with";/*** @param args the command line arguments*/public static void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IExecutorService service = instance.getExecutorService("service");ListConsumer consumer = new ListConsumer();ListProducer producer = new ListProducer();try {service.submit(producer);service.submit(consumer);Thread.sleep(10000);} catch(InterruptedException ie){System.out.println("Got interrupted");} finally {instance.shutdown();}}public static class ListConsumer implements Runnable, Serializable, HazelcastInstanceAware {private transient HazelcastInstance instance;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICondition condition = lock.newCondition(CONDITION_NAME);IList list = instance.getList(LIST_NAME);lock.lock();try {while(list.isEmpty()) {condition.await(2, TimeUnit.SECONDS);}while(!list.isEmpty()) {System.out.println("value is " + list.get(0));list.remove(0);}} catch(InterruptedException ie) {System.out.println("Consumer got interrupted");} finally {lock.unlock();}System.out.println("Consumer leaving");}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}}public static class ListProducer implements Runnable, Serializable, HazelcastInstanceAware {private transient HazelcastInstance instance;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICondition condition = lock.newCondition(CONDITION_NAME);IList list = instance.getList(LIST_NAME);lock.lock();try {for(int i = 1; i <= 10; i++){list.add(i);}condition.signalAll();} finally {lock.unlock();}System.out.println("Producer leaving");}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}}
}

ICondition

这是真正的ICondition示例。 注意SpunProducer和SpunConsumer如何共享相同的ICondition并相互发出信号。 注意我正在使用超时来防止死锁。

package hazelcastprimitives.icondition;import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.ICondition;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IList;
import com.hazelcast.core.ILock;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;/**** @author Daryl*/
public class IConditionExample {static final String LOCK_NAME = "lock";static final String CONDITION_NAME = "condition";static final String SERVICE_NAME = "spinderella";static final String LIST_NAME = "list";public static final void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IExecutorService service  = instance.getExecutorService(SERVICE_NAME);service.execute(new SpunConsumer());service.execute(new SpunProducer());try {Thread.sleep(10000);} catch(InterruptedException ie) {System.out.println("Hey we got out sooner than I expected");} finally {instance.shutdown();System.exit(0);}}public static class SpunProducer implements Serializable, Runnable, HazelcastInstanceAware {private transient HazelcastInstance instance;private long counter = 0;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICondition condition = lock.newCondition(CONDITION_NAME);IList list = instance.getList(LIST_NAME);lock.lock();            try {if(list.isEmpty()) {populate(list);System.out.println("telling the consumers");condition.signalAll();}for(int i = 0; i < 2; i++) {while(!list.isEmpty()) {System.out.println("Waiting for the list to be empty");System.out.println("list size: " + list.size() );condition.await(2, TimeUnit.SECONDS);}  populate(list);System.out.println("Telling the consumers");condition.signalAll();}} catch(InterruptedException ie) {System.out.println("We have a found an interuption");} finally {condition.signalAll();System.out.println("Producer exiting stage left");lock.unlock();}}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}private void populate(IList list) {System.out.println("Populating list");long currentCounter = counter;for(; counter < currentCounter + 10; counter++) {list.add(counter);}}}public static class SpunConsumer implements Serializable, Runnable, HazelcastInstanceAware {private transient HazelcastInstance instance;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICondition condition = lock.newCondition(CONDITION_NAME);IList list = instance.getList(LIST_NAME);lock.lock();            try {for(int i = 0; i < 3; i++) {while(list.isEmpty()) {System.out.println("Waiting for the list to be filled");condition.await(1, TimeUnit.SECONDS);}System.out.println("removing values");while(!list.isEmpty()){System.out.println("value is " + list.get(0));list.remove(0);}System.out.println("Signaling the producer");condition.signalAll();}} catch(InterruptedException ie) {System.out.println("We had an interrupt");} finally {System.out.println("Consumer exiting stage right");condition.signalAll();lock.unlock();}}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}}}

ICountDownLatch

package hazelcastprimitives.icountdownlatch;import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.ICountDownLatch;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.IList;
import com.hazelcast.core.ILock;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;/**** @author Daryl*/
public class ICountDownLatchExample {static final String LOCK_NAME = "lock";static final String LATCH_NAME = "condition";static final String SERVICE_NAME = "spinderella";static final String LIST_NAME = "list";public static final void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IExecutorService service  = instance.getExecutorService(SERVICE_NAME);service.execute(new SpunMaster());service.execute(new SpunSlave());try {Thread.sleep(10000);} catch(InterruptedException ie) {System.out.println("Hey we got out sooner than I expected");} finally {instance.shutdown();System.exit(0);}}public static class SpunMaster implements Serializable, Runnable, HazelcastInstanceAware {private transient HazelcastInstance instance;private long counter = 0;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICountDownLatch latch = instance.getCountDownLatch(LATCH_NAME);IList list = instance.getList(LIST_NAME);lock.lock();            try {latch.trySetCount(10);populate(list, latch);} finally {System.out.println("Master exiting stage left");lock.unlock();}}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}private void populate(IList list, ICountDownLatch latch) {System.out.println("Populating list");long currentCounter = counter;for(; counter < currentCounter + 10; counter++) {list.add(counter);latch.countDown();}}}public static class SpunSlave implements Serializable, Runnable, HazelcastInstanceAware {private transient HazelcastInstance instance;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICountDownLatch latch = instance.getCountDownLatch(LATCH_NAME);IList list = instance.getList(LIST_NAME);lock.lock();            try {if(latch.await(2, TimeUnit.SECONDS)) {while(!list.isEmpty()){System.out.println("value is " + list.get(0));list.remove(0);}}} catch(InterruptedException ie) {System.out.println("We had an interrupt");} finally {System.out.println("Slave exiting stage right");lock.unlock();}}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}}}

等量线

组态

这是ISemaphore配置:

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast
xsi:schemaLocation ="http://www.hazelcast.com/schema/config
http://www.hazelcast.com/schema/config/hazelcast-config-3.0.xsd "
xmlns ="http://www.hazelcast.com/schema/config "
xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance"><network><join><multicast enabled="true"/></join></network><semaphore name="to reduce access"><initial-permits>3</initial-permits></semaphore>
</hazelcast>

范例程式码

package hazelcastprimitives.isemaphore;import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceAware;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.ISemaphore;
import com.hazelcast.core.IdGenerator;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;/**** @author Daryl*/
public class ISemaphoreExample {static final String SEMAPHORE_NAME = "to reduce access";static final String GENERATOR_NAME = "to use";/*** @param args the command line arguments*/public static void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IExecutorService service = instance.getExecutorService("service");List<Future> futures = new ArrayList(10);try {for(int i = 0; i < 10; i++) {futures.add(service.submit(new GeneratorUser(i)));}// so I wait til the last man.  No this may not be scalable.for(Future future: futures) {future.get();}} catch(InterruptedException ie){System.out.printf("Got interrupted.");} catch(ExecutionException ee) {System.out.printf("Cannot execute on Future. reason: %s\n", ee.toString());} finally {service.shutdown();instance.shutdown();}}static class GeneratorUser implements Callable, Serializable, HazelcastInstanceAware {private transient HazelcastInstance instance;private final int number;public GeneratorUser(int number) {this.number = number;}@Overridepublic Long call() {ISemaphore semaphore = instance.getSemaphore(SEMAPHORE_NAME);IdGenerator gen = instance.getIdGenerator(GENERATOR_NAME);long lastId = -1;try {semaphore.acquire();try {for(int i = 0; i < 10; i++){lastId = gen.newId();System.out.printf("current value of generator on %d is %d\n", number, lastId);Thread.sleep(1000);}} catch(InterruptedException ie) {System.out.printf("User %d was Interrupted\n", number);} finally {semaphore.release();}} catch(InterruptedException ie) {System.out.printf("User %d Got interrupted\n", number);}System.out.printf("User %d is leaving\n", number);return lastId;}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}}}

结论

在这篇文章中讨论了Hazelcast的原语。 大多数(如果不是全部的话)都围绕线程协调展开。 分享了原始和个人经历的解释。 在示例中,显示了不同类型的协调。 可以通过以下版本的http://darylmathisonblog.googlecode.com/svn/trunk/HazelcastPrimitives下载示例。

参考资料

  • 《榛树之书》:可在www.hazelcast.com找到
  • Hazelcast文档:在Hazelcast下载发现在发现www.hazelcast.org

翻译自: https://www.javacodegeeks.com/2014/10/beginners-guide-to-hazelcast-part-3.html

hazelcast入门教程