Java驱动实践
- Redis的客户端有主要有三种:JDBC-Redis, JRedis和Jedis,推荐Jedis的方式;
- Jedis API Online Help:http://www.jarvana.com/jarvana/view/redis/clients/jedis/2.0.0/jedis-2.0.0-javadoc.jar!/index.html;
- Jredis使用总结:
- pipeline:starts a pipeline,which is a very efficient way to send lots of command and read all the responses when you finish sending them;即pipeline适用于批处理,当有大量的操作需要一次性执行的时候,可以用管道;
- 分布式的id生成器:因为redis-server是单线程处理client端的请求的,所以可以使用jedis.incr(“id_key”)来生成序列;
- 分布式锁watch/multi:可以用来实现跨jvm的同步问题;
- 方法1:Jedis.setnx(key, value),推荐的方法;
- 方法2:事务multi;
- 方法3:事务+监听;
- redis分布式:jedis里面通过MD5,MURMUR Hash(默认)两种方式实现了分布式,也可以自己实现redis.clients.util.Hashing接口;
- Jedis中Pool的问题:It seems like server has closed the connection;
- 原因:redis-server关闭了此客户端的连接,server端设置了maxidletime(默认是5分钟),服务端会不断循环检测clinet的最后一次通信时间(lastinteraction),如果大于maxidletime,则关闭连接,并回收相关资源,client在向该连接中写数据后就会由于server端已经关闭而出现broken pipe的问题;
- 错误的配置:在spring初始化时获取一次实例化jedisCommands,而后每次的redis的调用时并未从pool中获取;
- 解决办法;
———————– Pipeline ———————–
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class Redis
{
public static void main(String[] args)
{
// 连接Redis 服务器;
Jedis jedis = new Jedis( “192.168.10.112”, 6379);
// 生成Pipeline;
Pipeline pipeline = jedis.pipelined();
// 管道操作,会把所有的操作发送给服务器端,然后一次性执行;
// ……
pipeline.incr( “key”);
// 获得所有的结果;
pipeline.sync();
}
}
———————– Pipeline ———————–
———————– Jedis.setnx(key, value) ———————–
import java.util.Random;
import org.apache.commons.pool.impl.GenericObjectPool.Config;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class RedisLock
{
// 加锁标志
public static final String LOCKED = “TRUE”;
public static final long ONE_MILLI_NANOS = 1000000L;
// 默认超时时间(ms )
public static final long DEFAULT_TIME_OUT = 3000;
public static JedisPool pool;
public static final Random r = new Random();
// 锁的超时时间(s),过期删除
public static final int EXPIRE = 5 * 60;
static
{
pool = new JedisPool( new Config(), “host”, 6379);
}
private Jedis jedis;
private String key;
// 锁状态标志
private boolean locked = false;
public RedisLock(String key)
{
this. key = key;
this. jedis = pool.getResource();
}
public boolean lock( long timeout)
{
long nano = System. nanoTime();
timeout *= ONE_MILLI_NANOS;
try
{
while ((System. nanoTime() – nano) < timeout)
{
if ( jedis.setnx( key, LOCKED) == 1)
{
jedis.expire( key, EXPIRE);
this. locked = true ;
return locked;
}
// 短暂休眠,nano避免出现活锁
Thread. sleep(3, r.nextInt(500));
}
}
catch (Exception e)
{
e.printStackTrace();
}
return false;
}
public boolean lock()
{
return lock( DEFAULT_TIME_OUT);
}
// 无论是否加锁成功,必须调用
public void unlock()
{
try
{
if ( locked)
{
jedis.del( key);
}
}
finally
{
pool.returnResource( jedis);
}
}
}
———————– Jedis.setnx(key, value) ———————–
———————– 事务multi ———————–
public boolean lock_2( long timeout)
{
long nano = System. nanoTime();
timeout *= ONE_MILLI_NANOS;
try
{
while ((System. nanoTime() – nano) < timeout)
{
Transaction t = jedis.multi();
// 开启事务,当server端收到 multi指令;
// 会将该client的命令放入一个队列,然后依次执行,直到收到 exec指令;
t.getSet( key, LOCKED);
t.expire( key, EXPIRE);
String ret = (String) t.exec().get(0);
if (ret == null || ret.equals(“UNLOCK”))
{
return true;
}
// 短暂休眠,nano避免出现活锁;
Thread. sleep(3, r.nextInt(500));
}
}
catch (Exception e)
{
}
return false;
}
———————– 事务multi ———————–
———————– 事务+监听 ———————–
public boolean lock_3( long timeout)
{
long nano = System. nanoTime();
timeout *= ONE_MILLI_NANOS;
try
{
while ((System. nanoTime() – nano) < timeout)
{
jedis.watch( key);
// 开启watch之后,如果key的值被修改,则事务失败, exec方法返回null;
String value = jedis.get( key);
if (value == null || value.equals(“UNLOCK”))
{
Transaction t = jedis.multi();
t.setex( key, EXPIRE, LOCKED);
if (t.exec() != null)
{
return true;
}
}
jedis.unwatch();
// 短暂休眠,nano避免出现活锁;
Thread. sleep(3, r.nextInt(500));
}
}
catch (Exception e)
{
}
return false;
}
———————– 事务+监听 ———————–
———————– redis分布式 ———————–
List<JedisShardInfo> hosts = new ArrayList<JedisShardInfo>();
// server1
JedisShardInfo host1 = new JedisShardInfo( “”, 6380, 2000);
// server2
JedisShardInfo host2 = new JedisShardInfo( “”, 6381, 2000);
hosts.add(host1);
hosts.add(host2);
ShardedJedis jedis = new ShardedJedis(hosts);
jedis.set(“key”, “”);
———————– redis分布式 ———————–
———————– 错误的配置 ———————–
<bean id=”jedisPoolConfig” class=”redis.clients.jedis.JedisPoolConfig”>
<property name=”maxActive” value=”20″ />
<property name=”maxIdle” value=”10″ />
<property name=”maxWait” value=”1000″ />
</bean>
<!– jedis shard信息配置 –>
<bean id=”jedis.shardInfo” class=”redis.clients.jedis.JedisShardInfo”>
<constructor-arg index=”0″ value=”*.*.*.*” />
<constructor-arg index=”1″ value=”6379″ />
</bean>
<!– jedis shard pool配置 –>
<bean id=”shardedJedisPool” class=”redis.clients.jedis.ShardedJedisPool”>
<constructor-arg index=”0″ ref=”jedisPoolConfig” />
<constructor-arg index=”1″>
<list>
<ref bean=”jedis.shardInfo” />
</list>
</constructor-arg>
</bean>
<bean id=”jedisCommands” factory-bean=”shardedJedisPool” factory-method=”getResource” />
———————– 错误的配置 ———————–
———————– 解决办法 ———————–
<!– POOL配置 –>
<bean id=”jedisPoolConfig” class=”redis.clients.jedis.JedisPoolConfig”>
<property name=”maxActive” value=”20″ />
<property name=”maxIdle” value=”10″ />
<property name=”maxWait” value=”1000″ />
<property name=”testOnBorrow” value=”true”/>
</bean>
<!– jedis shard信息配置 –>
<bean id=”jedis.shardInfo” class=”redis.clients.jedis.JedisShardInfo”>
<constructor-arg index=”0″ value=”*.*.*.*” />
<constructor-arg index=”1″ value=”6379″ />
</bean>
<!– jedis shard pool配置 –>
<bean id=”shardedJedisPool” class=”redis.clients.jedis.ShardedJedisPool”>
<constructor-arg index=”0″ ref=”jedisPoolConfig” />
<constructor-arg index=”1″>
<list>
<ref bean=”jedis.shardInfo” />
</list>
</constructor-arg>
</bean>
———————– 解决办法 ———————–