Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go build -o OracleSync2MySQL -ldflags "-X main.Version=0.1.2"
3 changes: 3 additions & 0 deletions cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
var srcDb *sql.DB
var destDb *sql.DB
var oracleConnStr godror.ConnectionParams
var srcSchema string

func getConn() (connStr *connect.DbConnStr) {
connStr = new(connect.DbConnStr)
Expand All @@ -30,6 +31,7 @@ func getConn() (connStr *connect.DbConnStr) {
connStr.SrcPassword = viper.GetString("src.password")
connStr.SrcDatabase = viper.GetString("src.database")
connStr.SrcPort = viper.GetInt("src.port")
connStr.SrcSchema = viper.GetString("src.schema")
connStr.DestHost = viper.GetString("dest.host")
connStr.DestPort = viper.GetInt("dest.port")
connStr.DestUserName = viper.GetString("dest.username")
Expand All @@ -46,6 +48,7 @@ func PrepareSrc(connStr *connect.DbConnStr) {
srcPassword := connStr.SrcPassword
srcDatabase := connStr.SrcDatabase
srcPort := connStr.SrcPort
srcSchema = connStr.SrcSchema
//srcConn := fmt.Sprintf("oracle://%s:%s@%s:%d/%s?LOB FETCH=POST", srcUserName, srcPassword, srcHost, srcPort, srcDatabase)
//fmt.Println(srcConn)
var err error
Expand Down
19 changes: 1 addition & 18 deletions cmd/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"github.com/liushuochen/gotable"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"io"
"os"
"path/filepath"
"strconv"
"time"
)
Expand All @@ -30,7 +27,7 @@ var compareDbCmd = &cobra.Command{
// 每页的分页记录数,仅全库迁移时有效
pageSize := viper.GetInt("pageSize")
// 从配置文件中获取需要排除的表
excludeTab := viper.GetStringSlice("exclude")
excludeTab = viper.GetStringSlice("exclude")
PrepareSrc(connStr)
PrepareDest(connStr)
var tableMap map[string][]string
Expand All @@ -40,20 +37,6 @@ var compareDbCmd = &cobra.Command{
} else { // 不指定-s选项,查询源库所有表名
tableMap = fetchTableMap(pageSize, excludeTab)
}
// 创建运行日志目录
logDir, _ := filepath.Abs(CreateDateDir(""))
f, err := os.OpenFile(logDir+"/"+"run.log", os.O_CREATE|os.O_APPEND|os.O_RDWR, os.ModePerm)
if err != nil {
log.Fatal(err)
}
defer func() {
if err := f.Close(); err != nil {
log.Fatal(err) // 或设置到函数返回值中
}
}()
// log信息重定向到平面文件
multiWriter := io.MultiWriter(os.Stdout, f)
log.SetOutput(multiWriter)
// 以下开始调用比对表行数的方法
start := time.Now()
// 用于控制协程goroutine运行时候的并发数,例如3个一批,3个一批的goroutine并发运行
Expand Down
37 changes: 2 additions & 35 deletions cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import (
"fmt"
"github.com/liushuochen/gotable"
"github.com/spf13/viper"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -35,7 +32,7 @@ var createTableCmd = &cobra.Command{
// 每页的分页记录数,仅全库迁移时有效
pageSize := viper.GetInt("pageSize")
// 从配置文件中获取需要排除的表
excludeTab := viper.GetStringSlice("exclude")
excludeTab = viper.GetStringSlice("exclude")
PrepareSrc(connStr)
PrepareDest(connStr)
var tableMap map[string][]string
Expand All @@ -45,20 +42,6 @@ var createTableCmd = &cobra.Command{
} else { // 不指定-s选项,查询源库所有表名
tableMap = fetchTableMap(pageSize, excludeTab)
}
// 创建运行日志目录
logDir, _ := filepath.Abs(CreateDateDir(""))
f, err := os.OpenFile(logDir+"/"+"run.log", os.O_CREATE|os.O_APPEND|os.O_RDWR, os.ModePerm)
if err != nil {
log.Fatal(err)
}
defer func() {
if err := f.Close(); err != nil {
log.Fatal(err) // 或设置到函数返回值中
}
}()
// log信息重定向到平面文件
multiWriter := io.MultiWriter(os.Stdout, f)
log.SetOutput(multiWriter)
// 实例初始化,调用接口中创建目标表的方法
var db Database
start := time.Now()
Expand Down Expand Up @@ -89,27 +72,11 @@ var onlyDataCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
// 获取配置文件中的数据库连接字符串
connStr := getConn()
// 创建运行日志目录
logDir, _ := filepath.Abs(CreateDateDir(""))
// 输出调用文件以及方法位置
log.SetReportCaller(true)
f, err := os.OpenFile(logDir+"/"+"run.log", os.O_CREATE|os.O_APPEND|os.O_RDWR, os.ModePerm)
if err != nil {
log.Fatal(err)
}
defer func() {
if err := f.Close(); err != nil {
log.Fatal(err)
}
}()
// log信息重定向到平面文件
multiWriter := io.MultiWriter(os.Stdout, f)
log.SetOutput(multiWriter)
start := time.Now()
// map结构,表名以及该表用来迁移查询源库的语句
var tableMap map[string][]string
// 从配置文件中获取需要排除的表
excludeTab := viper.GetStringSlice("exclude")
excludeTab = viper.GetStringSlice("exclude")
log.Info("running SourceDB check connect")
// 生成源库数据库连接
PrepareSrc(connStr)
Expand Down
34 changes: 11 additions & 23 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
package cmd

import (
"OracleSync2MySQL/pkg/sirupsen/logrus"
"bytes"
"database/sql"
"fmt"
"github.com/mitchellh/go-homedir"
"io"
"math"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"

"github.com/sirupsen/logrus"
//"github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"OracleSync2MySQL/connect"
"github.com/liushuochen/gotable"
"github.com/spf13/viper"
)

var log = logrus.New()
var log, logDir = logrus.New()
var cfgFile string
var selFromYml bool
var metaData bool
var excludeTab []string

var wg sync.WaitGroup
var wg2 sync.WaitGroup
Expand All @@ -49,27 +49,11 @@ func startDataTransfer(connStr *connect.DbConnStr) {
exitChan := make(chan os.Signal)
signal.Notify(exitChan, os.Interrupt, os.Kill, syscall.SIGTERM)
go exitHandle(exitChan)
// 创建运行日志目录
logDir, _ := filepath.Abs(CreateDateDir(""))
// 输出调用文件以及方法位置
log.SetReportCaller(true)
f, err := os.OpenFile(logDir+"/"+"run.log", os.O_CREATE|os.O_APPEND|os.O_RDWR, os.ModePerm)
if err != nil {
log.Fatal(err)
}
defer func() {
if err := f.Close(); err != nil {
log.Fatal(err)
}
}()
// log信息重定向到平面文件
multiWriter := io.MultiWriter(os.Stdout, f)
log.SetOutput(multiWriter)
start := time.Now()
// map结构,表名以及该表用来迁移查询源库的语句
var tableMap map[string][]string
// 从配置文件中获取需要排除的表
excludeTab := viper.GetStringSlice("exclude")
excludeTab = viper.GetStringSlice("exclude")
log.Info("running SourceDB check connect")
// 生成源库数据库连接
PrepareSrc(connStr)
Expand Down Expand Up @@ -317,13 +301,17 @@ func prepareSqlStr(tableName string, pageSize int) (sqlList []string) {
return
}
// 根据当前表总数以及每页的页记录大小pageSize,自动计算需要多少页记录数,即总共循环多少次,如果表没有数据,后面判断下切片长度再做处理
sql2 := "/* goapp */" + "select ceil(count(*)/" + strconv.Itoa(pageSize) + ") as total_page_num from " + "\"" + tableName + "\""
sql2 := "/* goapp count */" + "select ceil(count(*)/" + strconv.Itoa(pageSize) + ") as total_page_num from " + "\"" + tableName + "\""
//以下是直接使用QueryRow
err = srcDb.QueryRow(sql2).Scan(&totalPageNum)
if err != nil {
log.Fatal(sql2, " exec failed ", err)
return
}
if totalPageNum == 1 {
sqlList = append(sqlList, fmt.Sprintf("SELECT %s FROM \"%s\"", colNameFull, tableName))
return sqlList
}
// 以下生成分页查询语句
for i := 0; i < totalPageNum; i++ { // 使用小于而不是小于等于,否则会多生成一条分页查询边界外的sql,即此sql查询源表没有数据,也会导致后面迁移数据有多个无用的goroutine
curStartPage := i + 1
Expand All @@ -345,7 +333,7 @@ func runMigration(logDir string, startPage int, tableName string, sqlStr string,
log.Info(fmt.Sprintf("%v Taskid[%d] Processing TableData %v ", time.Now().Format("2006-01-02 15:04:05.000000"), startPage, tableName))
start := time.Now()
// 直接查询,即查询全表或者分页查询(SELECT t.* FROM (SELECT id FROM test ORDER BY id LIMIT ?, ?) temp LEFT JOIN test t ON temp.id = t.id;)
sqlStr = "/* goapp */" + sqlStr
sqlStr = "/* goapp query */" + sqlStr
// 查询源库的sql
rows, err := srcDb.Query(sqlStr) //传入参数之后执行
defer rows.Close()
Expand Down
43 changes: 36 additions & 7 deletions cmd/tablemeta.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"bytes"
"database/sql"
"fmt"
"regexp"
Expand Down Expand Up @@ -204,7 +205,7 @@ func (tb *Table) IdxCreate(logDir string, tableName string, ch chan struct{}, id
log.Error(err)
}
LogOutput(logDir, "createSql", destIdxSql)
destIdxSql = "/* goapp */" + destIdxSql
destIdxSql = "/* goapp idx */" + destIdxSql
// 创建目标索引,主键、其余约束
if !metaData {
if _, err = destDb.Exec(destIdxSql); err != nil {
Expand All @@ -224,7 +225,20 @@ func (tb *Table) SeqCreate(logDir string) (ret []string) {
startTime := time.Now()
failedCount = 0
var dbRet, tableName string
rows, err := srcDb.Query("select table_name,trigger_body from user_triggers where upper(trigger_type) ='BEFORE EACH ROW'")
srcTableSql := "select table_name,trigger_body from user_triggers where upper(trigger_type) ='BEFORE EACH ROW' %s union select table_name,trigger_body from dba_triggers where upper(trigger_type) ='BEFORE EACH ROW' %s"
buffer := bytes.NewBufferString(" ")
if len(excludeTab) > 0 {
buffer.WriteString(" and table_name not in ( ")
for index, tabName := range excludeTab {
if index < len(excludeTab)-1 {
buffer.WriteString("'" + tabName + "'" + ",")
} else {
buffer.WriteString("'" + tabName + "'" + ")")
}
}
}
srcTableSql = fmt.Sprintf(srcTableSql, buffer.String(), buffer.String())
rows, err := srcDb.Query(srcTableSql)
if err != nil {
log.Error(err)
}
Expand All @@ -251,7 +265,7 @@ func (tb *Table) SeqCreate(logDir string) (ret []string) {
if len(match) == 2 {
autoColName := match[1]
// 创建目标数据库该表表的自增列索引
sqlAutoColIdx := "/* goapp */" + "create index ids_" + tableName + "_" + autoColName + "_" + strconv.Itoa(idx) + " on " + tableName + "(" + autoColName + ")"
sqlAutoColIdx := "/* goapp seq idx */" + "create index ids_" + tableName + "_" + autoColName + "_" + strconv.Itoa(idx) + " on " + tableName + "(" + autoColName + ")"
log.Info("[", idx, "] create auto_increment for table ", tableName)
LogOutput(logDir, "createSql", sqlAutoColIdx+";")
if !metaData {
Expand All @@ -263,7 +277,7 @@ func (tb *Table) SeqCreate(logDir string) (ret []string) {
}

// 更改目标数据库该表的列属性为自增列
sqlModifyAuto := "/* goapp */" + "alter table " + tableName + " modify " + autoColName + " bigint auto_increment"
sqlModifyAuto := "/* goapp seq auto */" + "alter table " + tableName + " modify " + autoColName + " bigint auto_increment"
LogOutput(logDir, "createSql", sqlModifyAuto+";")
if !metaData {
if _, err = destDb.Exec(sqlModifyAuto); err != nil {
Expand All @@ -286,7 +300,22 @@ func (tb *Table) FkCreate(logDir string) (ret []string) {
startTime := time.Now()
failedCount = 0
var tableName, sqlStr string
rows, err := srcDb.Query("SELECT B.TABLE_NAME,'ALTER TABLE ' || B.TABLE_NAME || ' ADD CONSTRAINT ' ||\n B.CONSTRAINT_NAME || ' FOREIGN KEY (' ||\n (SELECT listagg(A.COLUMN_NAME,',') within group(order by a.position)\n FROM USER_CONS_COLUMNS A\n WHERE A.CONSTRAINT_NAME = B.CONSTRAINT_NAME) || ') REFERENCES ' ||\n (SELECT B1.table_name FROM USER_CONSTRAINTS B1\n WHERE B1.CONSTRAINT_NAME = B.R_CONSTRAINT_NAME) || '(' ||\n (SELECT listagg(A.COLUMN_NAME,',') within group(order by a.position)\n FROM USER_CONS_COLUMNS A\n WHERE A.CONSTRAINT_NAME = B.R_CONSTRAINT_NAME) || ');'\nFROM USER_CONSTRAINTS B\nWHERE B.CONSTRAINT_TYPE = 'R' ")
srcTableSql := "SELECT B.TABLE_NAME,'ALTER TABLE ' || B.TABLE_NAME || ' ADD CONSTRAINT ' ||\n B.CONSTRAINT_NAME || ' FOREIGN KEY (' ||\n (SELECT listagg(A.COLUMN_NAME,',') within group(order by a.position)\n FROM USER_CONS_COLUMNS A\n WHERE A.CONSTRAINT_NAME = B.CONSTRAINT_NAME) || ') REFERENCES ' ||\n (SELECT B1.table_name FROM USER_CONSTRAINTS B1\n WHERE B1.CONSTRAINT_NAME = B.R_CONSTRAINT_NAME) || '(' ||\n (SELECT listagg(A.COLUMN_NAME,',') within group(order by a.position)\n FROM USER_CONS_COLUMNS A\n WHERE A.CONSTRAINT_NAME = B.R_CONSTRAINT_NAME) || ');'\nFROM USER_CONSTRAINTS B\nWHERE B.CONSTRAINT_TYPE = 'R' %s"
srcTableSql = srcTableSql + "union SELECT B.TABLE_NAME,'ALTER TABLE ' || B.TABLE_NAME || ' ADD CONSTRAINT ' ||\n B.CONSTRAINT_NAME || ' FOREIGN KEY (' ||\n (SELECT listagg(A.COLUMN_NAME,',') within group(order by a.position)\n FROM DBA_CONS_COLUMNS A\n WHERE A.CONSTRAINT_NAME = B.CONSTRAINT_NAME) || ') REFERENCES ' ||\n (SELECT B1.table_name FROM DBA_CONSTRAINTS B1\n WHERE B1.CONSTRAINT_NAME = B.R_CONSTRAINT_NAME) || '(' ||\n (SELECT listagg(A.COLUMN_NAME,',') within group(order by a.position)\n FROM DBA_CONS_COLUMNS A\n WHERE A.CONSTRAINT_NAME = B.R_CONSTRAINT_NAME) || ');'\nFROM DBA_CONSTRAINTS B\nWHERE B.CONSTRAINT_TYPE = 'R' %s"
buffer := bytes.NewBufferString(" ")
if len(excludeTab) > 0 {
buffer.WriteString(" and B.TABLE_NAME not in ( ")
for index, tabName := range excludeTab {
if index < len(excludeTab)-1 {
buffer.WriteString("'" + tabName + "'" + ",")
} else {
buffer.WriteString("'" + tabName + "'" + ")")
}
}
}
srcTableSql = fmt.Sprintf(srcTableSql, buffer.String(), buffer.String())
rows, err := srcDb.Query(srcTableSql)

if err != nil {
log.Error(err)
}
Expand All @@ -299,7 +328,7 @@ func (tb *Table) FkCreate(logDir string) (ret []string) {
log.Error(err)
}
log.Info("[", idx, "] create foreign key for table ", tableName)
sqlStr = "/* goapp */" + sqlStr
sqlStr = "/* goapp fk */" + sqlStr
LogOutput(logDir, "createSql", sqlStr)
if !metaData {
if _, err = destDb.Exec(sqlStr); err != nil {
Expand Down Expand Up @@ -343,7 +372,7 @@ func (tb *Table) NormalIdx(logDir string) (ret []string) {
}
log.Info("[", idx, "] create normal index for table ", tableName)
LogOutput(logDir, "createSql", createSql+";")
createSql = "/* goapp */" + createSql
createSql = "/* goapp normal idx */" + createSql
if !metaData {
if _, err = destDb.Exec(createSql); err != nil {
log.Error(createSql, " create normal index failed ", err)
Expand Down
1 change: 1 addition & 0 deletions connect/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type DbConnStr struct {
SrcPassword string
SrcDatabase string
SrcPort int
SrcSchema string
DestHost string
DestPort int
DestUserName string
Expand Down
1 change: 1 addition & 0 deletions example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ src:
database: orcl
username: admin
password: oracle
# schema: test
dest:
host: 192.168.1.37
port: 3306
Expand Down
Loading