Gray's Ven

我的学习之路


  • 首页

  • 归档

  • 标签

MYSQL主从复制 Docker快速实现

发表于 2020-12-25 | 分类于 mysql |

前言

mysql 主从原理是通过binlog文件来传递主库的修改的

操作

  • 下载mysql镜像

    1
    docker pull mysql:5.7.18
  • 准备一份my.cnf,这个版本的镜像里的mysql配置文件在 /etc/mysql/mysql.conf.d/mysqld.cnf,可以从容器中使用docker cp复制出来修改, 复制出两个文件 mysqld.master.cnf,mysqld.slave.cnf

    1
    2
    3
    4
    5
    6
    # 文件 mysqld.master.cnf 添加内容
    .... 省略内容
    [mysqld]
    log-bin = mysql-bin # master一定要开启binlog
    server-id = 1 # 实例唯一的id,跟从库不一样就行
    .... 省略内容
    1
    2
    3
    4
    5
    6
    # 文件 mysqld.slave.cnf 添加内容
    .... 省略内容
    [mysqld]
    log-bin = mysql-bin # 开启binlog
    server-id = 2 # 实例唯一的id,跟主库不一样就行
    .... 省略内容
  • 创建容器

    1
    2
    3
    4
    # 启动master
    docker run -d -p 3306:3306 -v /Users/所在文件夹/mysqld.master.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -e MYSQL_ROOT_PASSWORD=123456 --name mysql-master mysql:5.7.18
    # 启动从库 从库在宿舍既可以换个端口,也可以是用 -p 3306 让宿主机随机一个端口
    docker run -d -p 3307:3306 -v /Users/所在文件夹/mysqld.slave.cnf:/etc/mysql/mysql.conf.d/mysqld.cnf -e MYSQL_ROOT_PASSWORD=123456 --name mysql-slave mysql:5.7.18
  • 进入mysql主从配置

    1
    2
    3
    4
    5
    6
    # 进入容器命令
    docker exec -it mysql-master bash
    docker exec -it mysql-slave bash

    # 进入mysql
    mysql -uroot -p123456
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    -- master登录进入容器内mysql,添加用户用于主从复制
    GRANT REPLICATION SLAVE ON *.* TO 'follow'@'%' identified by 'follow';
    -- 刷新权限
    FLUSH PRIVILEGES;
    -- 查看binlog的文件名和位置,用于从库配置连接时指定文件和位置时使用
    show master status;
    +------------------+----------+--------------+------------------+-------------------+
    | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
    +------------------+----------+--------------+------------------+-------------------+
    | mysql-bin.000003 | 439 | | | |
    +------------------+----------+--------------+------------------+-------------------+

    master的ip地址可以在master机器上使用ip addr,也可以使用docker inspect 容器id 查看

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    -- 从库 进入容器内,连接主库
    change master to
    master_host = '172.17.0.2',
    master_user = 'follow',
    master_log_file = 'mysql-bin.000003',
    master_log_pos = 439,
    master_port = 3306,
    master_password = 'follow';
    -- 启动
    start slave;
    -- 查看状态
    show slave status;
    -- 重点查看 Slave_IO_Running与Slave_SQL_Running 是否为YES

PageHelper的clearPage无效

发表于 2019-07-11 | 分类于 debug |

简介

PageHelper可以很大程度减少在使用MyBatis PageHelper插件的时候。但是使用的时候发现,执行发现不止第一个sql被分页了,后面的也会有sql被执行了分页,顾去阅读PageHelper的源码查看原因。

一、直接说无效的原因

  • 该项目中 MyBatis 的mapper接口传入的参数如下,所有的前端传过来的搜索条件对象都继承了这个类。

    1
    2
    3
    4
    5
    6
    public class BaseSearchVo {
    private Integer pageNum = 1;
    private Integer pageSize = 10;
    private String orderBy;
    // setter getter...
    }
  • PageHelper参数设置了 pagehelper.supportMethodsArguments = true

当该参数设置为true的时候,PageHelper会检查出入sql的参数有没有 pageNum 和 pageSize 两个参数,如果有,则按照两个参数进行分页,等效于执行了PageHelper.startPage方法。

二、PageHelper源码 分页基本原理

PageHelper 主要通过分页拦截器 PageInterceptor 实现,基本的代码结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class PageInterceptor implements Interceptor {
// properties...

// 每个sql都会通过拦截器
@Override
public Object intercept(Invocation invocation) throws Throwable {
try {
// 获取方法的参数
Object[] args = invocation.getArgs();
// 省略处理参数代码

//调用方法判断是否需要进行分页,如果不需要,直接返回结果,skip方法查看下一段代码
if (!dialect.skip(ms, parameter, rowBounds)) {
// 分页处理sql
} else {
// 直接执行sql
}
} finally {
// 每次sql执行完成都会执行 clearPage 的操作,所以PageHelper只会对静态方法PageHelper.startPage(1, 10)方法后的第一个sql拦截,然后分页。
dialect.afterAll();
}
}

判断是否需要分页的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public boolean skip(MappedStatement ms, Object parameterObject, RowBounds rowBounds) {
if(ms.getId().endsWith(MSUtils.COUNT)){
throw new RuntimeException("在系统中发现了多个分页插件,请检查系统配置!");
}
// 主要判断方法!!!如果返回null,说明不需要分页,具体getPage方法见下一段代码
Page page = pageParams.getPage(parameterObject, rowBounds);
if (page == null) {
return true;
} else {
//设置默认的 count 列
if(StringUtil.isEmpty(page.getCountColumn())){
page.setCountColumn(pageParams.getCountColumn());
}
autoDialect.initDelegateDialect(ms);
return false;
}
}

pageParams.getPage()方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# class -> com.github.pagehelper.page.PageParams
public Page getPage(Object parameterObject, RowBounds rowBounds) {
// 获取通过 PageHelper.startPage 设置的page对象,如果没有设置即为null
Page page = PageHelper.getLocalPage();
if (page == null) {
if (rowBounds != RowBounds.DEFAULT) {
// 省略与本次无关的代码
} else if(supportMethodsArguments){
try {
// 重点来了,如果设置 supportMethodsArguments 参数为 true, 会尝试从参数中获取page对象
// 具体方法见下一段代码
page = PageObjectUtil.getPageFromObject(parameterObject, false);
} catch (Exception e) {
return null;
}
}
if(page == null){
return null;
}
PageHelper.setLocalPage(page);
}
// 省略部分代码...
return page;
}

PageObjectUtil.getPageFromObject 方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 方法通过反射获取pageNum与pageSize的值
public static <T> Page<T> getPageFromObject(Object params, boolean required) {
int pageNum;
int pageSize;
MetaObject paramsObject = null;
if (params == null) {
throw new PageException("无法获取分页查询参数!");
}
// 省略部分代码...
try {
// 获取参数的方法!!!
Object _pageNum = getParamValue(paramsObject, "pageNum", required);
Object _pageSize = getParamValue(paramsObject, "pageSize", required);
if (_pageNum == null || _pageSize == null) {
if(hasOrderBy){
Page page = new Page();
page.setOrderBy(orderBy.toString());
page.setOrderByOnly(true);
return page;
}
return null;
}
pageNum = Integer.parseInt(String.valueOf(_pageNum));
pageSize = Integer.parseInt(String.valueOf(_pageSize));
} catch (NumberFormatException e) {
throw new PageException("分页参数不是合法的数字类型!");
}
// 如果获取参数没有异常,则构建一个新的page对象设置到本地,类似于进行了PageHelper.startPage操作
Page page = new Page(pageNum, pageSize);
// 省略部分代码...
return page;
}
/**
* 从对象中取参数
*/
protected static Object getParamValue(MetaObject paramsObject, String paramName, boolean required) {
Object value = null;
if (paramsObject.hasGetter(PARAMS.get(paramName))) {
value = paramsObject.getValue(PARAMS.get(paramName));
}
if (value != null && value.getClass().isArray()) {
Object[] values = (Object[]) value;
if (values.length == 0) {
value = null;
} else {
value = values[0];
}
}
if (required && value == null) {
throw new PageException("分页查询缺少必要的参数:" + PARAMS.get(paramName));
}
return value;
}

总结

此次问题的主要原因是本地业务对象中的参数 pageNum 和 pageSize 和PageHelper中的分页参数命名完全一样,被反射获取到了,同时显示的设定了 supportMethodsArguments = true // 默认为false,导致只要使用到了这个搜索对象的地方都会进行分页操作,即使没有调用PageHelper.startPage 方法。

单例模式的线程安全问题

发表于 2019-07-10 | 分类于 java |

简介

在Java程序中,单例模式是很常见的模式。通常需要延迟初始化来降低初始化程序时对象创建的开销,通常使用的方法是双重锁检查,但是不加volatile限制的双重锁检查是一个错误的用法,正确的做法是通过volatile或者基于类初始化的方法来解决线程安全问题。

一、基于 volatile 的解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class SafeVolatileInstance {

private volatile static SafeVolatileInstance instance;

public static SafeVolatileInstance getInstance() {
if (instance == null) {
synchronized (SafeVolatileInstance.class) {
if (instance == null) {
instance = new SafeVolatileInstance();
}
}
}
return instance;
}
}
  • instance = SafeVolatileInstance(); 这个操作按照如下三个步骤进行
  • 由于所有线程在执行程序的时候必须要遵守 intra-thread semantics , intra-thread semantics 保证重排序不会改变单线程中的执行结果,所以步骤2和3可能会发生重排序,当步骤3先执行后,如果此时有另一个线程B访问getInstance方法,第一个if中instance == null 就会返回false,但此时instance对象并未真正初始化成功,线程B将访问到一个未初始化的对象,这就是双重锁检查的问题所在
  • 当instance使用volatile修饰之后, 2和3之间的重排序在多线程的环境中会被禁止 ,所以不会出现上述的访问到未初始化的对象的问题。
    示例

    需要基于JDK5以上的版本,从JDK5开始使用的新的JSR-133内存模型规范增强了volatile的语义。

二、基于类初始化的解决方案

1
2
3
4
5
6
7
8
9
10
11
public class SafeHolderInstance {

private static class InstanceHolder {
static SafeHolderInstance instance = new SafeHolderInstance();
}

public static SafeHolderInstance getInstance() {
return InstanceHolder.instance;
}

}

这个方案的实质是:允许上述的2和3重排序,但不允许非构造线程“看到”这个重排序

初始化一个类,包括执行这个类的静态初始化和初始化静态字段。根据Java语言规范,类或接口T在首次发生如下任意一种情况时,会被立即初始化:

  • T 是一个类,而且一个T的实例被创建
  • T 是一个类,且T中声明的静态方法被调用
  • T 中声明的一个静态字段被赋值
  • T 中声明的一个静态字段被使用,而且这个字段不是一个常量字段
  • T 是一个顶级类(Top level class),而且一个断言语句嵌套在T内被执行

在如上代码中,首次执行getInstance方法时 InstanceHolder会被初始化

多个线程同时调用getInstance方法时,会导致多个线程同时尝试初始化 InstanceHolder 类,Java语言规范规定,对于每一个类或者接口C,都有一个唯一的初始化锁LC与之对应。JVM在初始化期间会获取这个初始化锁,并且每个线程至少获取一次锁来确保这个类已经被初始化过了。初始化锁的实现可以确保类初始化只会由一个线程完成,当初始化完成之后,其他线程看到的是一个初始化好的instance实例,保证不会出现访问到不正常的对象的情况。具体的类初始化的锁同步问题可以参考《Java 并发编程的艺术》。

总结

对比两种方法会发现基于类初始化的方案更加简洁。但是基于volatile的双重检查锁定的方案有一个额外的优势,除了可以对静态字段实现延迟初始化,也可以对实例字段实现延迟初始化。
所以,如果需要对实例字段使用线程安全的延迟初始化,使用基于volatile的延迟初始化方案;如果需要对静态字段使用线程安全的延迟初始化,使用基于类初始化的方案。

Java8 Stream 使用简介

发表于 2019-06-21 | 分类于 java |

简介

Java 8 的提供的Stream类可以很方便的操作集合,同时可以很好的应用Lambda表达式。使用Stream可以使代码变得简洁,可以很明显的表达出代码的意思,比如 Stream.of(1, 2, 3).filter(i -> i > 1).collect(Collectors.toList()),根据单词的意思,将这个Stream过滤,过滤条件是元素大于1,然后生成一个List。

常用集合转换

  • 定义一个会使用到的对象 User

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class User {
    private Long id;
    private String name;
    public User() {}
    public User(Long id, String name) {
    this.id = id;
    this.name = name;
    }
    @Override
    public String toString() {
    return "User(" + name + ")";
    }
    // setter getter omitted
    }
  • List -> List
    通过: map、Collectors.toList()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // List<String> 转 List<Integer>
    List<String> stringList = Arrays.asList("1000", "2000", "3000");
    // 这里的 Integer::valueOf 是Lambda表达式的另一种写法,
    // 表示 i -> String.valueOf(i),下面这样的方法如 User::getId 表示 u -> u.getId()
    List<Integer> intList = stringList.stream()
    .map(Integer::valueOf)
    .collect(Collectors.toList());

    System.out.println(intList); // 输出 [1000, 2000, 3000]

    // 将一个List<User>的id提取出来生成 List<Long>
    List<User> users = Arrays.asList(new User(1004L, "Jerry"), new User(1001L, "Nick"), new User(1032L, "Gray"));
    List<Long> idList = users.stream()
    .map(User::getId)
    .collect(Collectors.toList());

    System.out.println(idList); // 输出 [1004, 1001, 1032]
  • List -> Map
    通过: map、Collectors.toMap()

    1
    2
    3
    4
    5
    6
    7
    // 将 List<User> 生成 Map<Long, User> key为user的id,value为user
    List<User> users = Arrays.asList(new User(1004L, "Jerry"), new User(1001L, "Nick"), new User(1032L, "Gray"));
    Map<Long, User> userMap = users.stream()
    .collect(Collectors.toMap(User::getId, Function.identity()));

    System.out.println(userMap);
    // 输出 {1032=User(Gray), 1001=User(Nick), 1004=User(Jerry)}
  • List<List> -> List
    方法: flatMap、Collectors.toMap()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
     // 将 字符串集合中的字符串按空格分开形成新的单词集合,最后生成一个单词集合
    List<String> list = Arrays.asList("Hello world", "Java welcome good good study");
    List<String> words = list.stream().map(s -> Arrays.asList(s.split(" ")))
    // peek只是输出一下元素在目前的情况
    .peek(i -> System.out.println("item -> " + i))
    .flatMap(Collection::stream)
    .collect(Collectors.toList());

    System.out.println("words -> " + words);
    // 输出如下:
    // item -> [Hello, world]
    // item -> [Java, welcome, good, good, study]
    // words -> [Hello, world, Java, welcome, good, good, study]

IntStream

  • 打印 [0, 100) 的数字

    1
    IntStream.range(0, 100).forEach(System.out::println);
  • 生成 [0, 100) 范围内的奇数集合

    1
    List<Integer> oddList = IntStream.range(0, 100).filter(i -> (i & 1 ) == 1).boxed().collect(Collectors.toList());

常用方法解释

  • map
    map方法相当于一个加工器,传入一个元素,返回一个元素,比如传入Integer类型,返回 String类型,这样Stream中的元素类型就变成String
  • filter
    过滤,传入当前Steam的元素,返回bool值,留下返回为true的元素
  • collect
    将Stream生成集合或者String,常用的操作 Collectors.toList()、Collectors.toMap()、Collectors.toSet()、 Collectors.joining(String delimiter)(将Stream中的字符串通过delimiter拼接)
  • anyMatch
    传入元素,返回bool值,当有一个true返回则整个方法返回true
  • parallel
    使用并行的方法操作Stream,注意并发的问题,会在不同的线程执行,如下:
    1
    2
    3
    4
    5
    6
    IntStream.range(0, 5).parallel().forEach(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
    // ForkJoinPool.commonPool-worker-1 -> 1
    // ForkJoinPool.commonPool-worker-3 -> 0
    // ForkJoinPool.commonPool-worker-1 -> 4
    // ForkJoinPool.commonPool-worker-3 -> 3
    // main -> 2

梯度下降法解决线性回归问题

发表于 2018-07-26 | 分类于 ml |

简介

梯度下降法是一种寻找目标函数最小值的优化算法

原理

假设一个函数 $f(x)$,在某点 $P_0(x_0)$ 处的值为 $f(x_0)$ ,导数为值 $\Delta{f}(x_0)$。由于要求函数最小值,导数表示$f(x)$的变化率,为正表示当 $x$ 增大 $f(x)$增大,反之减小。所以要寻找到下一个点 $x_1$,则

$x$沿着使$f(x)$减小的方向前进,其中 $\eta$ 表示步长,或者说学习率。当 $\Delta{f(x)} = f(xn) - f(x{n-1})$ 的绝对值小于某个指定的数值,如 $0.00001$时,表示已经沿着梯度下降的很吃力了,已经逼近极小值了。

示例

简单的代码示例

示例目标函数为 $y = (x - 2.5)^2 + 1$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 # 函数的导数 df
df = lambda x: 2 * (x - 2.5)
f = lambda x: (x - 2.5) ** 2 + 1
eta = 0.1
epsilon = 1e-8
# 最大循环次数
max_iters=100000
# 下降的起点
cur_x = 0.0
cur_iter = 0
while cur_iter < max_iters:
prev_x = cur_x
cur_x -= eta * df(cur_x)
if abs(f(cur_x) - f(prev_x)) < epsilon:
break
cur_iter += 1
print(cur_x)

打印结果为: 2.499891109642585,根据函数知道$x$ 在 $2.5$ 处取得最小值,结果十分接近。

线性回归中的梯度下降法

在上面的文章中谈到,线性回归的方程为

的损失函数为

此时,每个点的梯度方向就是对每个维度求偏导,形成的向量

将数据集添加第一列全是1,形成向量 $X_b$

此时已经得到$\Delta{J}$

  • 我们选初始化一个随机的数据集
    1
    2
    3
    4
    x = np.linspace(-3, 10, 30)
    y = x * 2 + 3 + np.random.normal(0, 2, size=30)
    # 转换成矩阵
    X = x.reshape(-1, 1)

将x和y点散点图画出来就是

线性回归数据图


  • 目标函数和梯度函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    # 目标函数
    def J(theta, X_b, y):
    # 防止溢出
    try:
    return np.sum((y - X_b.dot(theta))**2) / len(X_b)
    except:
    return float('inf')
    # 求梯度
    def dJ(theta, X_b, y):
    length = len(theta)
    ret = np.empty(length)
    ret[0] = np.sum(X_b.dot(theta) - y) * 2 / length
    for i in range(1, length):
    ret[i] = (X_b.dot(theta) - y).dot(X_b[:, i])
    return ret * 2 / len(X_b)
  • 梯度下降函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    def gradient_descent( X_b, y, init_theta, eta, epsilon=1e-8, max_iters=1e5):
    count = 0
    theta = init_theta
    while count < max_iters:
    gradient = dJ(theta, X_b, y)
    pre = theta
    theta = theta - eta * gradient
    if abs(J(theta, X_b, y) - J(pre, X_b, y)) < epsilon:
    break
    count += 1
    return theta
  • 使用测试的数据进行操作

    1
    2
    3
    4
    5
    # 将原矩阵添加一列1,使得与theta的维度相同
    X_b = np.hstack([np.ones((len(X), 1)), X])
    init_theta = np.zeros(X_b.shape[1])
    eta = 0.01
    theta = gradient_descent(X_b, y, init_theta, eta)

求出点theta值为 $[2.26867578, 2.20822388]$,将原数据点和 $[2.26867578, 2.20822388]$构成点直线作图,获得

线性回归结果

  • 向量化的处理

    求梯度函数修改为

    1
    2
    def dJ(theta, X_b, y):
    return X_b.T.dot(X_b.dot(theta) - y ) * 2. / len(X_b)
  • StandardScaler 预处理数据

    由于boston房价数据各个特征的单位不同,数值大小差异较大,故必须进行归一化处理,使用 StandardScaler 处理,在调用fit方法前

    1
    2
    3
    4
    5
    from sklearn.preprocessing import StandardScaler
    scaler = StandardScaler()
    scaler.fit(X_train)
    X_train = scaler.transform(X_train)
    X_test = scaler.transform(X_test)
    • 随机梯度下降法 Stochastic Gradient Descent

    每次计算梯度都要用到数据集中所有的样本,如果样本量过大,会导致速度受到很大的影响,假设将方程

    中的 $X_b^i$ 随机为某个样本,那么m个相同的值相加再除以m,可以得到

    随机梯度下降的趋势如下,下降的方向并不是梯度的方向,但是总体的趋势是向最小值的方向前进

随机梯度下降

对于 $\eta$ 的取值,在刚开始时候较大,随着越来越靠近最小区域,$\eta$ 的值应该慢慢减小,故设定 $\eta$ 的获取函数

其中 $a, b$ 两个是算法的超参数,如果不去调节该值,可以使用经验值 $a = 5, b = 50$, $iters$ 是循环的次数,随着次数的增多,$\eta$ 越来越小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def dJ_sgd(theta, X_b_i, y_i):
return X_b_i.T.dot(X_b_i.dot(theta) - y_i ) * 2.

def gradient_descent_sgd(X_b, y, initial_theta, n_iters=1e5, epsilon=1e-8):
t0 = 5
t1 = 50
theta = initial_theta
for n in range(n_iters):
eta = t0 / (n + t1)
pre_theta = theta
# 随机查询一个样本
ind = np.random.randint(len(X_b))
gradient = dJ_sgd(theta, X_b[ind], y[ind])
theta -= eta * gradient
return theta

上式无法让所有的样本都对算法产生影响,修改梯度下降的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# n_iters 表示要将所有的数据循环几次
def gradient_descent_sgd(X_b, y, initial_theta, n_iters=5, epsilon=1e-8, t0=5, t1=50):
theta = initial_theta
length = len(X_b)
def learning_rate(n):
return t0 / (n + t1)
for cur_iter in range(n_iters):
random_indexes = np.random.permutation(length)
for i in range(length):
eta = learning_rate(cur_iter * length + i)
pre_theta = theta
gradient = dJ_sgd(theta, X_b[random_indexes[i]], y[random_indexes[i]])
theta -= eta * gradient
return theta

线性回归的原理及Python实现

发表于 2018-07-05 | 分类于 ml |

简介

线性回归可以解决回归问题,同时也是一种思想简单,实现容易的算法。线性回归算法前提是可以预见所要解决的问题基本呈一个线性分布,比如房屋的面积和房屋的价格。这里使用到的Anaconda中的Jupyter工具编写代码,会使用到 numpy, sklearn两个Python库。

阅读全文 »

一路向北

发表于 2018-07-04 |

觉得为时已晚,恰恰是最早的时候

Gray

Gray

notebook

7 日志
4 分类
19 标签
RSS
© 2020 Ryven Gray
如果那样
我们还~