Skip to content

Commit

Permalink
Merge pull request #9134 from RocketChat/wrap-importers-in-try
Browse files Browse the repository at this point in the history
[FIX] Importers not recovering when an error occurs
  • Loading branch information
rodrigok committed Dec 15, 2017
1 parent 7d04990 commit 5653ee6
Show file tree
Hide file tree
Showing 5 changed files with 496 additions and 463 deletions.
268 changes: 137 additions & 131 deletions packages/rocketchat-importer-csv/server/importer.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,169 +169,175 @@ export class CsvImporter extends Base {
const startedByUserId = Meteor.userId();
Meteor.defer(() => {
super.updateProgress(ProgressStep.IMPORTING_USERS);
//Import the users
for (const u of this.users.users) {
if (!u.do_import) {
continue;
}

Meteor.runAsUser(startedByUserId, () => {
let existantUser = RocketChat.models.Users.findOneByEmailAddress(u.email);

//If we couldn't find one by their email address, try to find an existing user by their username
if (!existantUser) {
existantUser = RocketChat.models.Users.findOneByUsername(u.username);
}

if (existantUser) {
//since we have an existing user, let's try a few things
u.rocketId = existantUser._id;
RocketChat.models.Users.update({ _id: u.rocketId }, { $addToSet: { importIds: u.id } });
} else {
const userId = Accounts.createUser({ email: u.email, password: Date.now() + u.name + u.email.toUpperCase() });
Meteor.runAsUser(userId, () => {
Meteor.call('setUsername', u.username, {joinDefaultChannelsSilenced: true});
RocketChat.models.Users.setName(userId, u.name);
RocketChat.models.Users.update({ _id: userId }, { $addToSet: { importIds: u.id } });
u.rocketId = userId;
});
try {
//Import the users
for (const u of this.users.users) {
if (!u.do_import) {
continue;
}

super.addCountCompleted(1);
});
}
this.collection.update({ _id: this.users._id }, { $set: { 'users': this.users.users }});
Meteor.runAsUser(startedByUserId, () => {
let existantUser = RocketChat.models.Users.findOneByEmailAddress(u.email);

//Import the channels
super.updateProgress(ProgressStep.IMPORTING_CHANNELS);
for (const c of this.channels.channels) {
if (!c.do_import) {
continue;
}
//If we couldn't find one by their email address, try to find an existing user by their username
if (!existantUser) {
existantUser = RocketChat.models.Users.findOneByUsername(u.username);
}

Meteor.runAsUser(startedByUserId, () => {
const existantRoom = RocketChat.models.Rooms.findOneByName(c.name);
//If the room exists or the name of it is 'general', then we don't need to create it again
if (existantRoom || c.name.toUpperCase() === 'GENERAL') {
c.rocketId = c.name.toUpperCase() === 'GENERAL' ? 'GENERAL' : existantRoom._id;
RocketChat.models.Rooms.update({ _id: c.rocketId }, { $addToSet: { importIds: c.id } });
} else {
//Find the rocketchatId of the user who created this channel
let creatorId = startedByUserId;
for (const u of this.users.users) {
if (u.username === c.creator && u.do_import) {
creatorId = u.rocketId;
}
if (existantUser) {
//since we have an existing user, let's try a few things
u.rocketId = existantUser._id;
RocketChat.models.Users.update({ _id: u.rocketId }, { $addToSet: { importIds: u.id } });
} else {
const userId = Accounts.createUser({ email: u.email, password: Date.now() + u.name + u.email.toUpperCase() });
Meteor.runAsUser(userId, () => {
Meteor.call('setUsername', u.username, {joinDefaultChannelsSilenced: true});
RocketChat.models.Users.setName(userId, u.name);
RocketChat.models.Users.update({ _id: userId }, { $addToSet: { importIds: u.id } });
u.rocketId = userId;
});
}

//Create the channel
Meteor.runAsUser(creatorId, () => {
const roomInfo = Meteor.call(c.isPrivate ? 'createPrivateGroup' : 'createChannel', c.name, c.members);
c.rocketId = roomInfo.rid;
});
super.addCountCompleted(1);
});
}
this.collection.update({ _id: this.users._id }, { $set: { 'users': this.users.users }});

RocketChat.models.Rooms.update({ _id: c.rocketId }, { $addToSet: { importIds: c.id } });
//Import the channels
super.updateProgress(ProgressStep.IMPORTING_CHANNELS);
for (const c of this.channels.channels) {
if (!c.do_import) {
continue;
}

super.addCountCompleted(1);
});
}
this.collection.update({ _id: this.channels._id }, { $set: { 'channels': this.channels.channels }});

//If no channels file, collect channel map from DB for message-only import
if (this.channels.channels.length === 0) {
for (const cname of this.messages.keys()) {
Meteor.runAsUser(startedByUserId, () => {
const existantRoom = RocketChat.models.Rooms.findOneByName(cname);
if (existantRoom || cname.toUpperCase() === 'GENERAL') {
this.channels.channels.push({
id: cname.replace('.', '_'),
name: cname,
rocketId: (cname.toUpperCase() === 'GENERAL' ? 'GENERAL' : existantRoom._id),
do_import: true
const existantRoom = RocketChat.models.Rooms.findOneByName(c.name);
//If the room exists or the name of it is 'general', then we don't need to create it again
if (existantRoom || c.name.toUpperCase() === 'GENERAL') {
c.rocketId = c.name.toUpperCase() === 'GENERAL' ? 'GENERAL' : existantRoom._id;
RocketChat.models.Rooms.update({ _id: c.rocketId }, { $addToSet: { importIds: c.id } });
} else {
//Find the rocketchatId of the user who created this channel
let creatorId = startedByUserId;
for (const u of this.users.users) {
if (u.username === c.creator && u.do_import) {
creatorId = u.rocketId;
}
}

//Create the channel
Meteor.runAsUser(creatorId, () => {
const roomInfo = Meteor.call(c.isPrivate ? 'createPrivateGroup' : 'createChannel', c.name, c.members);
c.rocketId = roomInfo.rid;
});

RocketChat.models.Rooms.update({ _id: c.rocketId }, { $addToSet: { importIds: c.id } });
}

super.addCountCompleted(1);
});
}
}
this.collection.update({ _id: this.channels._id }, { $set: { 'channels': this.channels.channels }});

//If no channels file, collect channel map from DB for message-only import
if (this.channels.channels.length === 0) {
for (const cname of this.messages.keys()) {
Meteor.runAsUser(startedByUserId, () => {
const existantRoom = RocketChat.models.Rooms.findOneByName(cname);
if (existantRoom || cname.toUpperCase() === 'GENERAL') {
this.channels.channels.push({
id: cname.replace('.', '_'),
name: cname,
rocketId: (cname.toUpperCase() === 'GENERAL' ? 'GENERAL' : existantRoom._id),
do_import: true
});
}
});
}
}

//If no users file, collect user map from DB for message-only import
if (this.users.users.length === 0) {
for (const [ch, messagesMap] of this.messages.entries()) {
const csvChannel = this.getChannelFromName(ch);
if (!csvChannel || !csvChannel.do_import) {
continue;
}
Meteor.runAsUser(startedByUserId, () => {
for (const msgs of messagesMap.values()) {
for (const msg of msgs.messages) {
if (!this.getUserFromUsername(msg.username)) {
const user = RocketChat.models.Users.findOneByUsername(msg.username);
if (user) {
this.users.users.push({
rocketId: user._id,
username: user.username
});
}
}
}
}
});
}
}

//If no users file, collect user map from DB for message-only import
if (this.users.users.length === 0) {
//Import the Messages
super.updateProgress(ProgressStep.IMPORTING_MESSAGES);
for (const [ch, messagesMap] of this.messages.entries()) {
const csvChannel = this.getChannelFromName(ch);
if (!csvChannel || !csvChannel.do_import) {
continue;
}

const room = RocketChat.models.Rooms.findOneById(csvChannel.rocketId, { fields: { usernames: 1, t: 1, name: 1 } });
Meteor.runAsUser(startedByUserId, () => {
for (const msgs of messagesMap.values()) {
const timestamps = {};
for (const [msgGroupData, msgs] of messagesMap.entries()) {
super.updateRecord({ 'messagesstatus': `${ ch }/${ msgGroupData }.${ msgs.messages.length }` });
for (const msg of msgs.messages) {
if (!this.getUserFromUsername(msg.username)) {
const user = RocketChat.models.Users.findOneByUsername(msg.username);
if (user) {
this.users.users.push({
rocketId: user._id,
username: user.username
});
if (isNaN(new Date(parseInt(msg.ts)))) {
this.logger.warn(`Timestamp on a message in ${ ch }/${ msgGroupData } is invalid`);
super.addCountCompleted(1);
continue;
}

const creator = this.getUserFromUsername(msg.username);
if (creator) {
let suffix = '';
if (timestamps[msg.ts] === undefined) {
timestamps[msg.ts] = 1;
} else {
suffix = `-${ timestamps[msg.ts] }`;
timestamps[msg.ts] += 1;
}
const msgObj = {
_id: `csv-${ csvChannel.id }-${ msg.ts }${ suffix }`,
ts: new Date(parseInt(msg.ts)),
msg: msg.text,
rid: room._id,
u: {
_id: creator._id,
username: creator.username
}
};

RocketChat.sendMessage(creator, msgObj, room, true);
}

super.addCountCompleted(1);
}
}
});
}
}


//Import the Messages
super.updateProgress(ProgressStep.IMPORTING_MESSAGES);
for (const [ch, messagesMap] of this.messages.entries()) {
const csvChannel = this.getChannelFromName(ch);
if (!csvChannel || !csvChannel.do_import) {
continue;
}

const room = RocketChat.models.Rooms.findOneById(csvChannel.rocketId, { fields: { usernames: 1, t: 1, name: 1 } });
Meteor.runAsUser(startedByUserId, () => {
const timestamps = {};
for (const [msgGroupData, msgs] of messagesMap.entries()) {
super.updateRecord({ 'messagesstatus': `${ ch }/${ msgGroupData }.${ msgs.messages.length }` });
for (const msg of msgs.messages) {
if (isNaN(new Date(parseInt(msg.ts)))) {
this.logger.warn(`Timestamp on a message in ${ ch }/${ msgGroupData } is invalid`);
super.addCountCompleted(1);
continue;
}

const creator = this.getUserFromUsername(msg.username);
if (creator) {
let suffix = '';
if (timestamps[msg.ts] === undefined) {
timestamps[msg.ts] = 1;
} else {
suffix = `-${ timestamps[msg.ts] }`;
timestamps[msg.ts] += 1;
}
const msgObj = {
_id: `csv-${ csvChannel.id }-${ msg.ts }${ suffix }`,
ts: new Date(parseInt(msg.ts)),
msg: msg.text,
rid: room._id,
u: {
_id: creator._id,
username: creator.username
}
};

RocketChat.sendMessage(creator, msgObj, room, true);
}

super.addCountCompleted(1);
}
}
});
super.updateProgress(ProgressStep.FINISHING);
super.updateProgress(ProgressStep.DONE);
} catch (e) {
this.logger.error(e);
super.updateProgress(ProgressStep.ERROR);
}

super.updateProgress(ProgressStep.FINISHING);
super.updateProgress(ProgressStep.DONE);
const timeTook = Date.now() - started;
this.logger.log(`CSV Import took ${ timeTook } milliseconds.`);
});
Expand Down
Loading

0 comments on commit 5653ee6

Please sign in to comment.