SpringBoot中消息中间件Kafka实现Websocket的集群

1、在实际项目中,由于数据量的增大及并发数的增多,我们不可能只用一台服务,这个时候就需要用到的集群 。但是集群会遇到一些问题 。首先我们肯定会想到直接将的放到Redis等缓存服务器中,然后用的时候直接在Redis中获取 。但是的比较特殊,它不能被序列化,因为的是有状态的,还有就是的是有时效性的,只要连接一断开,该就会失效 。
2、解决集群的三种方法
2.1、通过相应的算法,将有关联的用户(即有可能发生聊天的对象)全部指定到一台服务 。这样就不会存在聊天对象收不到消息的情况 。但是这种方法有局限性,就是用户只能和有关联的用户聊天,不能和其他未建立关联的用户聊天 。
2.2、使用Redis的消息订阅功能来实现集群 。大致思路如下图 。

SpringBoot中消息中间件Kafka实现Websocket的集群

文章插图
2.3使用Kafka等消息中间件来实现集群 。这也是目前我选用的方式 。其实该方法和Redis的消息订阅大致思路差不多 。但是Redis我们只把他作为缓存使用,不想Redis涉及太多的业务处理,因此就选用了Kafka 。
SpringBoot中消息中间件Kafka实现Websocket的集群

文章插图
2.3.3、在项目的pom文件中添加Kafka依赖(注:Kafka依赖的版本必须和服务器上安装的版本一致)
org.springframework.kafkaspring-kafka1.1.0.RELEASE
SpringBoot中消息中间件Kafka实现Websocket的集群

文章插图
2.3.4、建立Kafka的生产者Bean
package com.yxl.configuration;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;import java.util.Map;/*** @Author: yxl* @Description: Kafka生产者(消息发送者)* @DATE: Created in 2018/11/14*/@Configuration@EnableKafkapublic class KafkaProducerConfig {public Map producerConfigs() {Map properties = new HashMap<>();properties.put("bootstrap.servers", "kafka集群IP1:9092,kafka集群IP2:9092");properties.put("acks", "all");//ack是判别请求是否为完整的条件(就是是判断是不是成功发送了) 。我们指定了“all”将会阻塞消息,这种设置性能最低,但是是最可靠的 。properties.put("retries", 0);//如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性 。properties.put("batch.size", 16384);//producer(生产者)缓存每个分区未发送消息 。缓存的大小是通过 batch.size 配置指定的properties.put("linger.ms", 1);properties.put("buffer.memory", 33554432);properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return properties;}public ProducerFactory producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate kafkaTemplate() {return new KafkaTemplate(producerFactory());}}
2.3.4、建立Kafka的消费者Bean以及消费者监听
package com.yxl.configuration;import com.yxl.myListener.MyKafkaListener;import org.apache.commons.lang3.StringUtils;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import java.net.InetAddress;import java.net.UnknownHostException;import java.util.HashMap;import java.util.Map;import java.util.UUID;/*** @Author: yxl* @Description: Kafka消费者* @DATE: Created in 2018/11/14*/@Configuration@EnableKafkapublic class KafkaConsumerConfig {@Beanpublic KafkaListenerContainerFactory kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.getContainerProperties().setPollTimeout(1500);return factory;}public ConsumerFactory consumerFactory() {return new DefaultKafkaConsumerFactory<>(consumerConfigs());}public Map consumerConfigs() {Map properties = new HashMap<>();properties.put("bootstrap.servers", "kafka集群IP1:9092,kafka集群IP2:9092");properties.put("group.id", getIPAddress()); //获取服务器Ip作为groupIdproperties.put("enable.auto.commit", "true");properties.put("auto.commit.interval.ms", "1000");properties.put("auto.offset.reset", "earliest");properties.put("session.timeout.ms", "30000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return properties;}public String getIPAddress() {try {InetAddress address = InetAddress.getLocalHost();if (address != null && StringUtils.isNotBlank(address.getHostAddress())) {return address.getHostAddress();}}catch (UnknownHostException e) {return UUID.randomUUID().toString().replace("-","");}return UUID.randomUUID().toString().replace("-","");}/*** 自定义监听*/@Beanpublic MyKafkaListener listener() {return new MyKafkaListener();}}
2.3.4、消费者监听
package com.yxl.myListener;import com.yxl.websocket.ChatWebsocket;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.log4j.Logger;import org.springframework.kafka.annotation.KafkaListener;/*** @Author: yxl* @Description:* @DATE: Created in 2018/11/14*/public class MyKafkaListener {Logger logger = Logger.getLogger(MyKafkaListener.class);/*** 发送聊天消息时的监听* @param record*/@KafkaListener(topics = {"chatMessage"})public void listen(ConsumerRecord record) {logger.info("chatMessage发送聊天消息监听:"+record.value().toString());ChatWebsocket chatWebsocket = new ChatWebsocket();chatWebsocket.kafkaReceiveMsg(record.value().toString());}/*** 关闭连接时的监听* @param record*/@KafkaListener(topics = {"closeWebsocket"})private void closeListener(ConsumerRecord record) {logger.info("closeWebsocket关闭websocket连接监听:"+record.value().toString());ChatWebsocket chatWebsocket = new ChatWebsocket();chatWebsocket.kafkaCloseWebsocket(record.value().toString());}}
2.3.6、集群java代码
package com.kk.server.chat.websocket;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang.StringUtils;import org.apache.log4j.Logger;import org.springframework.context.ApplicationContext;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;import javax.websocket.*;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;/*** Websocket集群* Created by yxl on 2018-11-17.*/@ServerEndpoint("/chat/{userId}")@Componentpublic class ChatWebsocket {private Logger logger = Logger.getLogger(ChatWebsocket.class);private static ApplicationContext applicationContext;private KafkaTemplate kafkaTemplate;//静态变量,用来记录当前在线连接数 。应该把它设计成线程安全的 。private static int onlineCount = 0;//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象 。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识private static Map drWebSocketSet = new ConcurrentHashMap<>(); //医生web/*** 连接建立成功调用的方法** @param userId用户标识*/@OnOpenpublic void onOpen(@PathParam("userId") String userId, Session session) {if (kafkaTemplate == null) {kafkaTemplate = applicationContext.getBean(KafkaTemplate.class); //获取kafka的Bean实例}drWebSocketSet.put(userId, session);}/*** s* 收到客户端消息后调用的方法** @param message 客户端发送过来的消息* @param session 可选的参数*/@OnMessagepublic void onMessage(String message, Session session) throws IOException {if ("ping".equals(message)) {session.getBasicRemote().sendText("pong"); //心跳} else {sendMessage(message, session); //调用Kafka进行消息分发}}/*** 发送消息** @param message* @param session* @throws IOException*/public void sendMessage(String message, Session session) throws IOException {if (StringUtils.isNotBlank(message)) {JSONObject jsonObject = JSONObject.parseObject(message);String sender_id = jsonObject.getString("sender_id"); //发送者IDString receiver_id = jsonObject.getString("receiver_id"); //接受者ID//TODO 这里可以进行优化 。可以首先根据接收方的userId,即receiver_id判断接收方是否在当前服务器,若在,直接获取session发送即可就不需要走Kafka了,节约资源kafkaTemplate.send("chatMessage", message);}}/*** 连接关闭调用的方法*/@OnClosepublic void onClose(Session session) {Map pathParameters = session.getPathParameters();String userId = pathParameters.get("userId"); //从session中获取userIdMap map = new HashMap<>();map.put("username", userId);kafkaTemplate.send("closeWebsocket", JSON.toJSONString(map));}}/*** 关闭连接** @param map 当前登录客户端的map*/private void close(Map map, String username) {if (StringUtils.isNotBlank(username)) {logger.info("关闭websocket链接,关闭客户端username:" + username);if (map.get(username) != null) {map.remove(username);}}}/*** kafka发送消息监听事件,有消息分发** @param message* @author yxl*/public void kafkaReceiveMsg(String message) {JSONObject jsonObject = JSONObject.parseObject(message);String receiver_id = jsonObject.getString("receiver_id"); //接受者IDif (drWebSocketSet.get(receiver_id) != null) {drWebSocketSet.get(receiver_id).getBasicRemote.sendText(message); //进行消息发送}}/*** kafka监听关闭websocket连接** @param closeMessage*/public void kafkaCloseWebsocket(String closeMessage) {JSONObject jsonObject = JSONObject.parseObject(closeMessage);String userId = jsonObject.getString("userId");drWebSocketSet.remove(userId);}/*** 发生错误时调用** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {logger.info("webscoket发生错误!关闭websocket链接");//onClose(session);error.printStackTrace();logger.info("webscoket发生错误!" + error.getMessage());}}
中不能直接注入相应的Bean实例,这个时候可以看我的另一篇博客
【SpringBoot中消息中间件Kafka实现Websocket的集群】
SpringBoot中消息中间件Kafka实现Websocket的集群

文章插图