lib/database-transaction.js
/* eslint global-require:0 */
/* eslint import/newline-after-import:0 */
import Model from './model';
import {tableNameForJoin} from './utils';
import Attributes from './attributes';
import DatabaseChangeRecord from './database-change-record';
require('promise.prototype.finally').shim();
require('promise.try').shim()
const {AttributeCollection, AttributeJoinedData} = Attributes;
/**
DatabaseTransaction exposes a convenient API for querying and modifying an RxDB
within a SQLite transaction.
You shouldn't need to instantiate this class directly. Instead, use
RxDatabase#inTransaction.
*/
export default class DatabaseTransaction {
constructor(database) {
this.database = database;
this._changeRecords = [];
this._opened = false;
}
/**
@borrows RxDatabase#find
*/
find(...args) { return this.database.find(...args) }
findBy(...args) { return this.database.findBy(...args) }
findAll(...args) { return this.database.findAll(...args) }
modelify(...args) { return this.database.modelify(...args) }
count(...args) { return this.database.count(...args) }
findJSONBlob(...args) { return this.database.findJSONBlob(...args) }
execute(fn) {
if (this._opened) {
throw new Error("DatabaseTransaction:execute was already called");
}
return this._query("BEGIN IMMEDIATE TRANSACTION").then(() => {
this._opened = true;
return fn(this);
}).finally(() => {
if (!this._opened) {
return null;
}
this._opened = false;
return this._query("COMMIT").then(() => {
this.database.transactionDidCommitChanges(this._changeRecords);
});
});
}
// Mutating the Database
persistJSONBlob(id, json) {
const JSONBlob = require('./json-blob').default;
return this.persistModel(new JSONBlob({id, json}));
}
/**
Asynchronously writes `model` to the cache and triggers a change event.
@param {Model} model - A {Model} to write to the database.
@returns {Promise} - A promise that:
- resolves after the database queries are complete and any listening
database callbacks have finished
- rejects if any databse query fails or one of the triggering
callbacks failed
*/
persistModel(model) {
if (!model || !(model instanceof Model)) {
throw new Error("DatabaseTransaction::persistModel - You must pass an instance of the Model class.");
}
return this.persistModels([model]);
}
/**
Asynchronously writes `models` to the cache and triggers a single change
event. Note: Models must be of the same class to be persisted in a batch operation.
@param {Array} models - An {Array} of {Model} objects to write to the database.
@returns {Promise} - A promise that:
- resolves after the database queries are complete and any listening
database callbacks have finished
- rejects if any databse query fails or one of the triggering
callbacks failed
*/
persistModels(models = []) {
if (models.length === 0) {
return Promise.resolve();
}
const klass = models[0].constructor;
const clones = [];
const ids = {};
if (!(models[0] instanceof Model)) {
throw new Error(`DatabaseTransaction::persistModels - You must pass an array of items which descend from the Model class.`);
}
for (const model of models) {
if (!model || (model.constructor !== klass)) {
throw new Error(`DatabaseTransaction::persistModels - When you batch persist objects, they must be of the same type`);
}
if (ids[model.id]) {
throw new Error(`DatabaseTransaction::persistModels - You must pass an array of models with different ids. ID ${model.id} is in the set multiple times.`)
}
clones.push(model.clone());
ids[model.id] = true;
}
// Note: It's important that we clone the objects since other code could mutate
// them during the save process. We want to guaruntee that the models you send to
// persistModels are saved exactly as they were sent.
const metadata = {
objectClass: clones[0].constructor.name,
objectIds: Object.keys(ids),
objects: clones,
type: 'persist',
};
return this._runMutationHooks('beforeDatabaseChange', metadata).then((data) => {
return this._writeModels(clones).then(() => {
this._runMutationHooks('afterDatabaseChange', metadata, data);
return this._changeRecords.push(new DatabaseChangeRecord(this.database, metadata));
});
});
}
/**
Asynchronously removes `model` from the cache and triggers a change event.
@param {Model} model - A {Model} to write to the database.
@returns {Promise} - A promise that
- resolves after the database queries are complete and any listening
database callbacks have finished
- rejects if any databse query fails or one of the triggering
callbacks failed
*/
unpersistModel(model) {
const clone = model.clone();
const metadata = {
objectClass: clone.constructor.name,
objectIds: [clone.id],
objects: [clone],
type: 'unpersist',
}
return this._runMutationHooks('beforeDatabaseChange', metadata).then((data) => {
return this._deleteModel(clone).then(() => {
this._runMutationHooks('afterDatabaseChange', metadata, data);
return this._changeRecords.push(new DatabaseChangeRecord(this.database, metadata));
});
});
}
// PRIVATE METHODS ////////////////////////////////////////////////////////
_query = (...args) => {
return this.database._query(...args);
}
_runMutationHooks(selectorName, metadata, data = []) {
const beforePromises = this.database.mutationHooks().map((hook, idx) =>
Promise.try(() => hook[selectorName](this._query, metadata, data[idx]))
);
return Promise.all(beforePromises).catch((e) => {
if (!process.env.CI) {
console.warn(`DatabaseTransaction Hook: ${selectorName} failed`, e);
}
return Promise.resolve([]);
});
}
// Fires the queries required to write models to the DB
//
// Returns a promise that:
// - resolves when all write queries are complete
// - rejects if any query fails
_writeModels(models) {
const promises = [];
// IMPORTANT: This method assumes that all the models you
// provide are of the same class, and have different ids!
// Avoid trying to write too many objects a time - sqlite can only handle
// value sets `(?,?)...` of less than SQLITE_MAX_COMPOUND_SELECT (500),
// and we don't know ahead of time whether we'll hit that or not.
if (models.length > 50) {
return Promise.all([
this._writeModels(models.slice(0, 50)),
this._writeModels(models.slice(50)),
]);
}
const klass = models[0].constructor;
const attributes = Object.keys(klass.attributes).map(key => klass.attributes[key])
const columnAttributes = attributes.filter((attr) =>
attr.queryable && attr.columnSQL && attr.jsonKey !== 'id'
);
// Compute the columns in the model table and a question mark string
const columns = ['id', 'data'];
const columnMarks = ['?', '?'];
columnAttributes.forEach((attr) => {
columns.push(attr.jsonKey);
columnMarks.push('?');
});
const columnsSQL = columns.join(',');
const marksSet = `(${columnMarks.join(',')})`;
// Prepare a batch insert VALUES (?,?,?), (?,?,?)... by assembling
// an array of the values and a corresponding question mark set
const values = [];
const marks = [];
const ids = [];
const modelsJSONs = [];
for (const model of models) {
const json = model.toJSON({joined: false});
modelsJSONs.push(json);
ids.push(model.id);
values.push(model.id, JSON.stringify(json, this.database.models.JSONReplacer));
columnAttributes.forEach((attr) => {
values.push(json[attr.jsonKey]);
});
marks.push(marksSet);
}
const marksSQL = marks.join(',');
promises.push(this._query(`REPLACE INTO \`${klass.name}\` (${columnsSQL}) VALUES ${marksSQL}`, values));
// For each join table property, find all the items in the join table for this
// model and delete them. Insert each new value back into the table.
const collectionAttributes = attributes.filter((attr) =>
attr.queryable && attr instanceof AttributeCollection
)
collectionAttributes.forEach((attr) => {
const joinTable = tableNameForJoin(klass, attr.itemClass);
promises.push(this._query(`DELETE FROM \`${joinTable}\` WHERE \`id\` IN ('${ids.join("','")}')`));
const joinMarks = [];
const joinedValues = [];
const joinMarkUnit = `(${["?", "?"].concat(attr.joinQueryableBy.map(() => '?')).join(',')})`;
const joinQueryableByJSONKeys = attr.joinQueryableBy.map(joinedModelKey =>
klass.attributes[joinedModelKey].jsonKey
);
const joinColumns = ['id', 'value'].concat(joinQueryableByJSONKeys);
// https://www.sqlite.org/limits.html: SQLITE_MAX_VARIABLE_NUMBER
const valuesPerRow = joinColumns.length;
const rowsPerInsert = Math.floor(600 / valuesPerRow);
const valuesPerInsert = rowsPerInsert * valuesPerRow;
models.forEach((model, idx) => {
const joinedModels = model[attr.modelKey] || [];
for (const joined of joinedModels) {
if (!attr.joinOnField) {
throw new Error(`Queryable collection attribute ${attr.modelKey} must specify a joinOnField`);
}
const joinValue = joined[attr.joinOnField];
joinMarks.push(joinMarkUnit);
joinedValues.push(model.id, joinValue);
for (const joinedJsonKey of joinQueryableByJSONKeys) {
joinedValues.push(modelsJSONs[idx][joinedJsonKey]);
}
}
});
if (joinedValues.length !== 0) {
// Write no more than 200 items (400 values) at once to avoid sqlite limits
// 399 values: slices:[0..0]
// 400 values: slices:[0..0]
// 401 values: slices:[0..1]
const slicePageCount = Math.ceil(joinMarks.length / rowsPerInsert) - 1;
for (let slice = 0; slice <= slicePageCount; slice++) {
const [ms, me] = [slice * rowsPerInsert, slice * rowsPerInsert + rowsPerInsert];
const [vs, ve] = [slice * valuesPerInsert, slice * valuesPerInsert + valuesPerInsert];
promises.push(this._query(`INSERT OR IGNORE INTO \`${joinTable}\` (\`${joinColumns.join('`,`')}\`) VALUES ${joinMarks.slice(ms, me).join(',')}`, joinedValues.slice(vs, ve)));
}
}
});
// For each joined data property stored in another table...
const joinedDataAttributes = attributes.filter(attr =>
attr instanceof AttributeJoinedData
)
joinedDataAttributes.forEach((attr) => {
for (const model of models) {
if (model[attr.modelKey] !== undefined) {
promises.push(this._query(`REPLACE INTO \`${attr.modelTable}\` (\`id\`, \`value\`) VALUES (?, ?)`, [model.id, model[attr.modelKey]]));
}
}
});
return Promise.all(promises);
}
// Fires the queries required to delete models to the DB
//
// Returns a promise that:
// - resolves when all deltion queries are complete
// - rejects if any query fails
_deleteModel(model) {
const promises = []
const klass = model.constructor;
const attributes = Object.keys(klass.attributes).map(key => klass.attributes[key]);
// Delete the primary record
promises.push(this._query(`DELETE FROM \`${klass.name}\` WHERE \`id\` = ?`, [model.id]))
// For each join table property, find all the items in the join table for this
// model and delte them. Insert each new value back into the table.
const collectionAttributes = attributes.filter(attr =>
attr.queryable && attr instanceof AttributeCollection
);
collectionAttributes.forEach((attr) => {
const joinTable = tableNameForJoin(klass, attr.itemClass);
promises.push(this._query(`DELETE FROM \`${joinTable}\` WHERE \`id\` = ?`, [model.id]))
});
const joinedDataAttributes = attributes.filter(attr =>
attr instanceof AttributeJoinedData
);
joinedDataAttributes.forEach((attr) => {
promises.push(this._query(`DELETE FROM \`${attr.modelTable}\` WHERE \`id\` = ?`, [model.id]));
});
return Promise.all(promises);
}
}