317 lines
6.1 KiB
Go
317 lines
6.1 KiB
Go
package importer
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"github.com/dutchcoders/slackarchive-import/config"
|
|
"github.com/dutchcoders/slackarchive-import/models"
|
|
"github.com/dutchcoders/slackarchive-import/utils"
|
|
|
|
"github.com/nlopes/slack"
|
|
|
|
logging "github.com/op/go-logging"
|
|
mgo "gopkg.in/mgo.v2"
|
|
elastic "gopkg.in/olivere/elastic.v5"
|
|
)
|
|
|
|
var log = logging.MustGetLogger("example")
|
|
|
|
func New(conf *config.Config) *Importer {
|
|
db := Database{}
|
|
if err := db.Open(conf.DSN); err != nil {
|
|
log.Error("Error opening database: %s", err.Error())
|
|
return nil
|
|
}
|
|
|
|
es, err := elastic.NewClient(elastic.SetURL(conf.ElasticSearch.Host), elastic.SetSniff(false))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return &Importer{
|
|
db: &db,
|
|
es: es,
|
|
}
|
|
}
|
|
|
|
type Importer struct {
|
|
db *Database
|
|
es *elastic.Client
|
|
|
|
conf *config.Config
|
|
}
|
|
|
|
type TeamImporter struct {
|
|
*Importer
|
|
|
|
client *slack.Client
|
|
team *models.Team
|
|
channels map[string]models.Channel
|
|
}
|
|
|
|
func (i *Importer) importTeam(client *slack.Client, token string) (*models.Team, error) {
|
|
team, err := client.GetTeamInfo()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
t := models.Team{}
|
|
if err := i.db.Teams.FindId(team.ID).One(&t); err == nil {
|
|
log.Debug("Team already exists: %s", t.ID)
|
|
} else {
|
|
return nil, err
|
|
}
|
|
|
|
if err := utils.Merge(&t, *team); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = i.db.Teams.Insert(&t); err == nil {
|
|
} else if mgo.IsDup(err) {
|
|
log.Info(err.Error())
|
|
} else {
|
|
return nil, err
|
|
}
|
|
|
|
return &t, nil
|
|
}
|
|
|
|
func (i *TeamImporter) importUsers(path string) error {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
var users []slack.User
|
|
if err := json.NewDecoder(f).Decode(&users); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, user := range users {
|
|
u := models.User{}
|
|
if err := i.db.Users.FindId(user.ID).One(&u); err == nil {
|
|
log.Debug("User already exists: %s", u.ID)
|
|
continue
|
|
} else {
|
|
|
|
return err
|
|
}
|
|
|
|
if err := utils.Merge(&u, user); err != nil {
|
|
log.Error("Error merging user(%s): %s", user.ID, err.Error())
|
|
continue
|
|
}
|
|
|
|
u.Team = i.team.ID
|
|
if err = i.db.Users.Insert(&u); err == nil {
|
|
} else if mgo.IsDup(err) {
|
|
log.Info(err.Error())
|
|
} else {
|
|
log.Error("Error upserting user(%s): %s", user.ID, err.Error())
|
|
continue
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (i *TeamImporter) importChannels(path string) (map[string]models.Channel, error) {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
channelsMap := map[string]models.Channel{}
|
|
|
|
var channels []slack.Channel
|
|
if err := json.NewDecoder(f).Decode(&channels); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, channel := range channels {
|
|
c := models.Channel{}
|
|
|
|
if err := i.db.Channels.FindId(channel.ID).One(&c); err == nil {
|
|
channelsMap[c.Name] = c
|
|
|
|
log.Debug("Channel already exists: %s", c.ID)
|
|
// found
|
|
continue
|
|
} else {
|
|
// if not found
|
|
}
|
|
|
|
if err := utils.Merge(&c, channel); err != nil {
|
|
log.Error("Error merging channel(%s): %s", channel.ID, err.Error())
|
|
continue
|
|
}
|
|
|
|
c.Team = i.team.ID
|
|
|
|
if err = i.db.Channels.Insert(&c); err == nil {
|
|
} else if mgo.IsDup(err) {
|
|
log.Info(err.Error())
|
|
} else {
|
|
log.Error("Error inserting channel(%s): %s", channel.ID, err.Error())
|
|
continue
|
|
}
|
|
|
|
channelsMap[c.Name] = c
|
|
}
|
|
|
|
i.channels = channelsMap
|
|
|
|
return channelsMap, nil
|
|
}
|
|
|
|
func (ti *TeamImporter) importMessages(channelID string, path string) error {
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
mongoInserter := make(chan models.Message)
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
bulk := ti.db.Messages.Bulk()
|
|
defer bulk.Run()
|
|
|
|
for msg := range mongoInserter {
|
|
bulk.Insert(msg)
|
|
}
|
|
}()
|
|
|
|
elasticInserter := make(chan models.Message)
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
bulk := ti.es.Bulk()
|
|
|
|
count := 0
|
|
|
|
index := func() {
|
|
if bulk.NumberOfActions() == 0 {
|
|
} else if response, err := bulk.Do(context.Background()); err != nil {
|
|
log.Error("Error indexing: ", err.Error())
|
|
} else {
|
|
indexed := response.Indexed()
|
|
count += len(indexed)
|
|
|
|
log.Info("Bulk indexing: %d total %d.", len(indexed), count)
|
|
}
|
|
}
|
|
|
|
for msg := range elasticInserter {
|
|
bulk = bulk.Add(elastic.NewBulkIndexRequest().
|
|
Index(fmt.Sprintf("slackarchive")).
|
|
Type("message").
|
|
Id(msg.ID).
|
|
Doc(msg),
|
|
)
|
|
}
|
|
|
|
index()
|
|
}()
|
|
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer f.Close()
|
|
|
|
var messages []slack.Message
|
|
if err := json.NewDecoder(f).Decode(&messages); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, message := range messages {
|
|
m := models.Message{}
|
|
if err := utils.Merge(&m, message); err != nil {
|
|
log.Error("Error merging message: %s", err.Error())
|
|
continue
|
|
}
|
|
|
|
m.ID = fmt.Sprintf("%s-%s-%s", ti.team.ID, channelID, m.Timestamp)
|
|
m.Team = ti.team.ID
|
|
m.Channel = channelID
|
|
|
|
if err := ti.db.Messages.FindId(m.ID).One(&m); err == nil {
|
|
// log.Debug("Message already exists: %s", m.ID)
|
|
//fmt.Printf("%#v\n", m)
|
|
} else {
|
|
// fmt.Printf("%#v\n", m)
|
|
}
|
|
|
|
elasticInserter <- m
|
|
mongoInserter <- m
|
|
}
|
|
|
|
close(elasticInserter)
|
|
close(mongoInserter)
|
|
|
|
wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (i *Importer) Import(token string, importPath string) (*TeamImporter, error) {
|
|
client := slack.New(token)
|
|
|
|
team, err := i.importTeam(client, token)
|
|
if err != nil {
|
|
log.Error("GetTeamInfo: %s", err.Error())
|
|
return nil, err
|
|
}
|
|
|
|
ti := &TeamImporter{
|
|
Importer: i,
|
|
|
|
client: client,
|
|
team: team,
|
|
channels: map[string]models.Channel{},
|
|
}
|
|
|
|
importPath = path.Join(importPath, team.Domain)
|
|
|
|
if err := ti.importUsers(path.Join(importPath, "users.json")); err != nil {
|
|
log.Error("importUsers: %s", err.Error())
|
|
}
|
|
|
|
if _, err := ti.importChannels(path.Join(importPath, "channels.json")); err != nil {
|
|
log.Error("importChannel: %s", err.Error())
|
|
}
|
|
|
|
for _, channel := range ti.channels {
|
|
channelPath := path.Join(importPath, channel.Name)
|
|
filepath.Walk(channelPath, func(path string, info os.FileInfo, err error) error {
|
|
if info.IsDir() {
|
|
return nil
|
|
}
|
|
|
|
log.Info("Importing path: %s", path)
|
|
|
|
if err := ti.importMessages(channel.ID, path); err != nil {
|
|
log.Error("importMessages: %s", err.Error())
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return ti, nil
|
|
}
|