0%

高并发保证数据安全

高并发下数据一致性问题

引言

首先写一段Java代码,我们通过线程池开六个线程,跑十个任务,每次取哈希表里的值并加1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(6);
Map<Integer, Integer> map = new ConcurrentHashMap<>();
map.put(1, 1);
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0;i < 10; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
int currentNumber = map.get(1);
System.out.println(Thread.currentThread().getName() + ":" + currentNumber);
currentNumber++;
map.put(1, currentNumber);
countDownLatch.countDown();
}
};
service.execute(runnable);
}
System.out.println("等待子线程执行完毕……");
countDownLatch.await();
System.out.println("执行完毕");
service.shutdown();
}

结果是显而易见的,肯定会有很多脏数据存在,最后的值不出意外不会为10。

image-20210802232141684

就是因为每个线程都有自己的私有数据,所以才会产生脏数据,所以用加锁使其串行化,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(6);
ReentrantLock lock = new ReentrantLock(false);
Map<Integer, Integer> map = new ConcurrentHashMap<>();
map.put(1, 1);
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0;i < 10; i++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
lock.lock();
try {
int currentNumber = map.get(1);
System.out.println(Thread.currentThread().getName() + ":" + currentNumber);
currentNumber++;
map.put(1, currentNumber);
countDownLatch.countDown();
} finally {
lock.unlock();
}
}
};
service.execute(runnable);
}
System.out.println("等待子线程执行完毕……");
countDownLatch.await();
System.out.println("执行完毕,最终结果:"+map.get(1));
service.shutdown();
}

这样结果就可以并行显示了。

image-20210802235935343

存在的问题

首先可知线程还有几种状态,我们刚刚加锁便是使它阻塞,所以是同步阻塞状态,不会产生脏数据,但是性能会下降很多,随便一个流量洪峰可能就down了,然后被辞退。

解决方案

所以我们要改写同步阻塞状态,首先升级变为同步非阻塞状态,在线上环境中肯定是多台服务器分布式搭建的,有自己的jetty集群,有我们依赖的其他服务集群,还有我们中间件集群,随之而来的就是各种rpc调用。这么多集群,每台机器都是独立的,如果采取刚刚的同步阻塞状态,那就是集体被沉默了。

所以我们挑一个leader,让他拿一把锁,其他人拿不到就不等了下次再来碰碰运气,这就是分布式锁。

以redis为例,一个KV数据库,就用最简单的K-V模型,谁第一个把key写进数据库中,谁就拿到了这把锁,用完了我再把锁删了,留给下个人用。

1
2
3
4
5
> setnx lock:code true
OK
we can do something in Java
> del lock:code
(integer) 1

但是如果我们在拿到锁时候,程序出现例如NullException导致中断,就会导致锁无法释放,所以我们要将锁设置一个过期时间,防止没有删除。

1
2
3
4
5
6
> setnx lock:code true
OK
> expire lock:code 2 (注:时间为秒)
we can do something in Java
> del lock:code
(integer) 1

但是我们发现setnx指令和expire指令是两段指令分别执行,并不是原子化处理,拿到锁之后且第二条指令未执行的情况下又会返回第一种情况,后来redis开源作者写了个新指令。

1
2
3
4
> set lock:code true ex 3 nx 
OK
we can do something in Java
> del lock:code

尽管实现了原子化,但是分布式锁还是有很多问题,比如临界区代码未执行完,锁被释放;保证锁的可重入性等等,反正不管怎么样算是实现了我们的同步非阻塞。

解决方案的解决方案

虽然锁的问题解决了。

在非阻塞的情况下,有十个任务进来,相当于十个线程,但是只有一个线程获得了分布式锁,其他九个线程没有拿到,就要用循环轮询,但是抢占式的线程,会出现有些线程一直被插队,导致饿死。

所以引入异步解决方案。

本地可以引入RocketMQ,作为异步消息队列,使用maven引入到工程内部。

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>

配置好对应的ip、端口以及topic name,实现自己的Producer和Consumer,下面简单实现一个例子。

初始化对应的nameAdder和topicName,利用消息队列中的生产者消费者实现逻辑即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 生产者
*/
public class MqProducer {

private static final ObjectMapper mapper = new ObjectMapper();

private final DefaultMQProducer producer;

//通过注解注入yml中的信息
@Value("${mq.nameserver.addr}")
private String nameAddr;

@Value("${mq.topicname}")
private String topicName;

@PostConstruct
public void init() throws MQClientException {
// 初始化
producer = new DefaultMQProducer("vvs_group");
producer.setNamesrvAddr(nameAddr);
producer.start();
}

public boolean asyncDoSomething (Object... params) {
Message message = new Message(topicName, "doSomething", mapper.writeValueAsString(params));
try {
producer.send(message);
} catch (Throwable e) {
e.printStackTrace();
return false;
}
return true;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 消费者
*/
public Class MqConsumer {
private final DefaultMQPushConsumer consumer;

@Value("${mq.nameserver.addr}")
private String nameAddr;

@Value("${mq.topicname}")
private String topicName;

@PostConstruct
public void init() {
comsumer = new DefaultMQPushConsumer("vvs_consumer_group");
// 订阅所有的消息
consumer.setNamesrvAddr(nameAddr);
consumer.subscribe(topicName, "*");
// 注册一个消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
/**
* 做当前的业务逻辑
*/
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动consumer
consumer.start();
}
}

在consumer启动后发现,DefaultMQPushConsumer并没有继承一些线程相关的类,那他是如何通过守护线程的形式来监听到消息的呢?

经过一通翻阅源码找到

  • 首先是进入到DefaultMQPushConsumerImpl中,判断当前的服务状态,如果状态为CREATE_JUST且注册成功,则调用MQClientFactory.start();
  • 第二步是使用synchronized加锁,保证clientFactoy只调用一次,通过NettyRemotingClient通知NettyRemotingServer。
  • 而在NettyRemotingServer构造函数中中new了一个Timer守护线程。

image-20211206000841818

image-20211206000858142

总结

多线程其实在IO密集的场景下使用更好,但要注意脏数据问题,在性能和稳定上做好权衡,同时在一些流量大的接口,可以改为异步非阻塞。

参考资料

RocketMQ remoting模块分析

RocketMQ整体架构

《Redis深度历险》