page contents

redis实现多对多消息传递

以下内容希望帮助到大家!

多对多消息发送与获取(即是群组)

通常有两种方法实现:

  第一种为消息推送。Redis内置有这种机制,publish往频道推送消息、subscribe订阅频道。这种方法有一个缺点就是必须保证接收者时刻在线(即是此时程序不能停下来,一直保持监控状态,假若断线后就会出现客户端丢失信息)

  第二种为消息拉取。所谓消息拉取,就是客户端自主去获取存储在服务器中的数据。Redis内部没有实现消息拉取这种机制。因此我们需要自己手动编写代码去实现这个功能。

模块要求:

  1、用户能够自行创建群组,并成为群主

  2、群主可以拉人进来作为群组成员、并且可以踢人

  3、用户可以直接退出群组

  4、可以发送消息,每一位成员都可以拉取消息

  5、群组的消息最大容纳量为5000条

  6、成员可以拉取新消息,并提示有多少新消息

  7、成员可以分页获取之前已读的旧消息

功能就写这几个吧,有需要或者想练习的同学可以增加其他功能,例如禁言、匿名消息发送、文件发送等等。

  Redis实现思路:

   1、群组的消息以及群组的成员组成采用有序集合进行存储。群组消息有序集合的member存储用户发送的json数据消息,score存储唯一值,将采用原子操作incr获取string中的自增长值进行存储;群组成员有序集合的member存储user,score存储非零数字(在这里这个score意义不大,我的例子代码中使用数字1为群主的score,其他的存储为2。当然这使用这个数据还可以扩展别的功能,例如群组中成员等级)可参考下面数据存储结构简图。

  2、用户所加入的群组也是采用有序集合进行存储。其中,member存储群组ID,score存储用户已经获取该群组的最大消息分值(对应群组消息的score值)

  3、用户创建群组的时候,通过原子操作incr从而获取一个唯一ID

  4、用户在群中发送消息时,也是通过原子操作incr获取一个唯一自增长有序ID

  5、在执行incr时,为防止并发导致竞争关系,因此需要进行加锁操作

  6、创建群组方法简要思路,任何一个用户都可以创建群组聊天,在创建的同时,可以选择时是否添加群组成员(参数通过数组的形式)。创建过程将会为这个群组建立一个群组成员有序集合(群组信息有序集合暂时不创建),接着将群主添加进去,再将群ID添加用户所参加的群组有序集合中。

数据存储结构图:

v2-743c00c2b1643f666edbb2a97bfbe6d9_720w.jpgv2-97e1e0ad57517d0681bef0a104103259_720w.jpg

PHP的代码实现:

#ManyPullMessage.class.php

1 <?php
  2 class ManyPullMessage
  3 {
  4     private $redis='';  #存储redis对象
  5     /**
  6     * @desc 构造函数
  7     * 
  8     * @param $host string | redis主机
  9     * @param $port int    | 端口
 10     */
 11     public function __construct($host,$port=6379)
 12     {
 13         $this->redis=new Redis();
 14         $this->redis->connect($host,$port);
 15     } 
 16 
 17     /**
 18     * @desc 用于创建群组的方法,在创建的同时还可以拉人进群组
 19     * 
 20     * @param $user   string   | 用户名,创建群组的主人
 21     * @param $addUser array   | 其他用户构成的数组
 22     *
 23     * @param $lockName string | 锁的名字,用于获取群组ID的时候用
 24     * @return int 返回群组ID
 25     */
 26     public function createGroupChat($user, $addUser=array(), $lockName='chatIdLock')
 27     {
 28         $identifier=$this->getLock($lockName);  #获取锁
 29         if($identifier)
 30         {
 31             $id=$this->redis->incr('groupChatID');       #获取群组ID
 32             $this->releaseLock($lockName,$identifier);   #释放锁
 33         }
 34         else
 35             return false;
 36         $messageCount=$this->redis->set('countMessage_'.$id, 0);  #初始化这个群组消息计数器
 37         #开启非事务型流水线,一次性将所有redis命令传给redis,减少与redis的连接
 38         $pipe=$this->redis->pipeline();   
 39         $this->redis->zadd('groupChat_'.$id, 1, $user);  #创建群组成员有序集合,并添加群主
 40         #将这个群组添加到user所参加的群组有序集合中
 41         $this->redis->zadd('hasGroupChat_'.$user, 0, $id);  
 42         foreach ($addUser as $v)    #创建群组的同时需要添加的用户成员
 43         {
 44             $this->redis->zadd('groupChat_'.$id, 2, $v);
 45             $this->redis->zadd('hasGroupChat_'.$v, 0, $id);
 46         }
 47         $pipe->exec();
 48         return $id;    #返回群组ID
 49     }
 50 
 51     /**
 52     * @desc 群主主动拉人进群
 53     *
 54     * @param $user       string | 群主名
 55     * @param $groupChatID   int | 群组ID
 56     * @param $addMembers array  | 需要拉进群的用户
 57     *
 58     * @return bool
 59     */
 60     public function addMembers($user, $groupChatID, $addMembers=array())
 61     {
 62         $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user);  #将groupChatName的群主取出来
 63         if($groupMasterScore==1)     #判断user是否是群主
 64         {
 65             $pipe=$this->redis->pipeline(); #开启非事务流水线
 66             foreach ($addMembers as $v) 
 67             {
 68                 $this->redis->zadd('groupChat_'.$groupChatID, 2, $v);                 #添加进群
 69                 $this->redis->zadd('hasGroupChat_'.$v, 0, $groupChatID); #添加群名到用户的有序集合中
 70             }
 71             $pipe->exec();
 72             return true;
 73         }
 74         return false;
 75     }
 76 
 77     /**
 78     * @desc 群主删除成员
 79     *
 80     * @param $user       string | 群主名
 81     * @param $groupChatID   int | 群组ID
 82     * @param $delMembers  array | 需要删除的成员名字
 83     *
 84     * @return bool
 85     */
 86     public function delMembers($user, $groupChatID, $delMembers=array())
 87     {
 88         $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); 
 89         if($groupMasterScore==1)     #判断user是否是群主
 90         {
 91             $pipe=$this->redis->pipeline(); #开启非事务流水线
 92             foreach ($delMembers as $v) 
 93             {
 94                 $this->redis->zrem('groupChat_'.$groupChatID, $v);                 
 95                 $this->redis->zrem('hasGroupChat_'.$v, $groupChatID); 
 96             }
 97             $pipe->exec();
 98             return true;
 99         }
100         return false;
101     }
102 
103     /**
104     * @desc 退出群组
105     *
106     * @param $user string     | 用户名
107     * @param $groupChatID int | 群组名
108     */
109     public function quitGroupChat($user, $groupChatID)
110     {
111         $this->redis->zrem('groupChat_'.$groupChatID, $user);
112         $this->redis->zrem('hasGroupChat_'.$user, $groupChatID);
113         return true;
114     }
115 
116     /**
117     * @desc 发送消息
118     *
119     * @param $user string        | 用户名
120     * @param $groupChatID int    | 群组ID
121     * @param $messageArr array   | 包含发送消息的数组
122     * @param $preLockName string | 群消息锁前缀,群消息锁全名为countLock_群ID
123     *
124     * @return bool
125     */
126     public function sendMessage($user, $groupChatID, $messageArr, $preLockName='countLock_')
127     {
128         $memberScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); #成员score
129         if($memberScore)
130         {
131             $identifier=$this->getLock($preLockName.$groupChatID);  #获取锁
132             if($identifier)     #判断获取锁是否成功
133             {
134                 $messageCount=$this->redis->incr('countMessage_'.$groupChatID);
135                 $this->releaseLock($preLockName.$groupChatID,$identifier);  #释放锁
136             }
137             else
138                 return false;
139             $json_message=json_encode($messageArr);
140             $this->redis->zadd('groupChatMessage_'.$groupChatID, $messageCount, $json_message);
141             $count=$this->redis->zcard('groupChatMessage_'.$groupChatID);   #查看信息量大小
142             if($count>5000) #判断数据量有没有达到5000条
143             {   #数据量超5000,则需要清除旧数据
144                 $start=5000-$count;
145                 $this->redis->zremrangebyrank('groupChatMessage_'.$groupChatID, $start, $count);
146             }
147             return true;
148         }
149         return false;
150     }
151 
152     /**
153     * @desc 获取新信息
154     *
155     * @param $user string | 用户名
156     *
157     * @return 成功则放回json数据数组,无新信息返回false
158     */
159     public function getNewMessage($user)
160     {
161         $arrID=$this->redis->zrange('hasGroupChat_'.$user, 0, -1, 'withscores');    #获取用户拥有的群组ID
162         $json_message=array();  #初始化
163         foreach ($arrID as $k => $v)    #遍历循环所有群组,查看是否有新消息
164         {
165             $messageCount=$this->redis->get('countMessage_'.$k);    #群组最大信息分值数
166             if($messageCount>$v)    #判断用户是否存在未读新消息
167             {
168                 $json_message[$k]['message']=$this->redis->zrangebyscore('groupChatMessage_'.$k, $v+1, $messageCount);
169                 $json_message[$k]['count']=count($json_message[$k]['message']);  #统计新消息数量
170                 $this->redis->zadd('hasGroupChat_'.$user, $messageCount, $k);    #更新已获取消息
171             }   
172         }
173         if($json_message)
174             return $json_message;
175         return false;
176     }
177 
178     /**
179     * @desc 分页获取群组信息
180     *
181     * @param $user    string  | 用户名 
182     * @param $groupChatID int | 群组ID
183     * @param $page        int | 第几页
184     * @param $size        int | 每页多少条数据
185     *
186     * @return 成功返回json数据,失败返回false
187     */
188     public function getPartMessage($user, $groupChatID, $page=1, $size=10)
189     {
190         $start=$page*$size-$size;   #开始截取数据位置
191         $stop=$page*$size-1;        #结束截取数据位置
192         $json_message=$this->redis->zrevrange('groupChatMessage_'.$groupChatID, $start, $stop);
193         if($json_message)
194             return $json_message;
195         return false;
196     }
197 
198 
199     /**
200     * @desc 加锁方法
201     *
202     * @param $lockName string | 锁的名字
203     * @param $timeout int | 锁的过期时间
204     *
205     * @return 成功返回identifier/失败返回false
206     */
207     public function getLock($lockName, $timeout=2)
208     {
209         $identifier=uniqid();       #获取唯一标识符
210         $timeout=ceil($timeout);    #确保是整数
211         $end=time()+$timeout;
212         while(time()<$end)          #循环获取锁
213         {
214             /*
215             #这里的set操作可以等同于下面那个if操作,并且可以减少一次与redis通讯
216             if($this->redis->set($lockName, $identifier array('nx', 'ex'=>$timeout)))
217                 return $identifier;
218             */
219             if($this->redis->setnx($lockName, $identifier))    #查看$lockName是否被上锁
220             {
221                 $this->redis->expire($lockName, $timeout);     #为$lockName设置过期时间
222                 return $identifier;                             #返回一维标识符
223             }
224             elseif ($this->redis->ttl($lockName)===-1) 
225             {
226                 $this->redis->expire($lockName, $timeout);     #检测是否有设置过期时间,没有则加上
227             }
228             usleep(0.001);         #停止0.001ms
229         }
230         return false;
231     }
232 
233     /**
234     * @desc 释放锁
235     *
236     * @param $lockName string   | 锁名
237     * @param $identifier string | 锁的唯一值
238     *
239     * @param bool
240     */
241     public function releaseLock($lockName,$identifier)
242     {
243         if($this->redis->get($lockName)==$identifier)   #判断是锁有没有被其他客户端修改
244         { 
245             $this->redis->multi();
246             $this->redis->del($lockName);   #释放锁
247             $this->redis->exec();
248             return true;
249         }
250         else
251         {
252             return false;   #其他客户端修改了锁,不能删除别人的锁
253         }
254     }
255 
256 
257 }
258 
259 ?>

测试:

  1、建立createGroupChat.php(测试创建群组功能)

  执行代码并创建568、569群组(群主为jack)

1 include './ManyPullMessage.class.php';
2 $object=new ManyPullMessage('192.168.95.11');
3 #创建群组
4 $user='jack';
5 $arr=array('jane1','jane2');
6 $a=$object->createGroupChat($user,$arr);
7 echo "<pre>";
8 print_r($a);
9 echo "</pre>";die;

v2-32bbf926b2ee06b71d12900550f6aed3_720w.jpgv2-85beaa1134ab91c0053069e18d09f949_720w.jpg

  2、建立addMembers.php(测试添加成员功能)

  执行代码并添加新成员

1 include './ManyPullMessage.class.php';
2 $object=new ManyPullMessage('192.168.95.11');
3 $b=$object->addMembers('jack','568',array('jane1','jane2','jane3','jane4'));
4 echo "<pre>";
5 print_r($b);
6 echo "</pre>";die;

v2-ba43cd573887e1de4fdf254237b26e48_720w.jpg

  3、建立delete.php(测试群主删除成员功能)

1 include './ManyPullMessage.class.php';
2 $object=new ManyPullMessage('192.168.95.11');
3 #群主删除成员
4 $c=$object->delMembers('jack', '568', array('jane1','jane4'));
5 echo "<pre>";
6 print_r($c);
7 echo "</pre>";die;

v2-17183718033ad9aee1d0f4fd9734229a_
                    </div>
                    <div class=

  • 发表于 2020-05-21 10:24
  • 阅读 ( 487 )

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
Pack
Pack

1135 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1478 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章