Mentions légales du service

Skip to content
Snippets Groups Projects
Commit e3a383ca authored by Yannick Li's avatar Yannick Li
Browse files

Merge branch '24-add-support-for-our-own-crdts' into 'dev'

Resolve "Add support for our own CRDTs"

See merge request !25
parents d87b862e 27dd8fd3
No related branches found
No related tags found
2 merge requests!48Release v1.1.4,!25Resolve "Add support for our own CRDTs"
Pipeline #192981 failed
......@@ -25,6 +25,8 @@ npx add-cors-to-couchdb
1.**Install Project dependencies**
Installation requieres a [GitLab token](https://docs.gitlab.com/ee/user/project/deploy_tokens/) to download the private C-CRDTLib npm package.
Go to project root directory and:
```bash
......
......@@ -25,38 +25,37 @@
},
"homepage": "https://gitlab.inria.fr/concordant/software/c-service/-/blob/master/README.md",
"devDependencies": {
"@babel/core": "^7.6.0",
"@babel/node": "^7.6.1",
"@babel/preset-env": "^7.6.0",
"@types/express": "^4.17.1",
"@babel/core": "^7.12.3",
"@babel/node": "^7.12.6",
"@babel/preset-env": "^7.12.1",
"@types/express": "^4.17.8",
"@types/jest": "^24.0.18",
"@types/lodash": "^4.14.138",
"@types/lodash": "^4.14.164",
"@types/pouchdb": "^6.4.0",
"@typescript-eslint/eslint-plugin": "^3.6.1",
"@typescript-eslint/parser": "^3.6.1",
"eslint": "^7.4.0",
"eslint-config-prettier": "^6.11.0",
"husky": "^4.2.5",
"@typescript-eslint/eslint-plugin": "^3.10.1",
"@typescript-eslint/parser": "^3.10.1",
"eslint": "^7.12.1",
"eslint-config-prettier": "^6.15.0",
"husky": "^4.3.0",
"jest": "^24.9.0",
"lint-staged": "^10.2.11",
"prettier": "^2.0.5",
"lint-staged": "^10.5.1",
"prettier": "^2.1.2",
"ts-jest": "^24.1.0",
"typescript": "^3.9.6"
"typescript": "^3.9.7"
},
"dependencies": {
"@types/uuid": "^3.4.5",
"apollo-server-express": "^2.9.6",
"@concordant/c-crdtlib": "0.0.1",
"@types/uuid": "^3.4.9",
"apollo-server-express": "^2.19.0",
"body-parser": "^1.19.0",
"delta-crdts": "^0.10.3",
"delta-crdts-msgpack-codec": "^0.2.0",
"express": "^4.17.1",
"graphql": "^14.5.8",
"graphql-tools": "^4.0.5",
"lodash": "^4.17.15",
"nano": "^8.1.0",
"pouchdb": "^7.1.1",
"graphql": "^14.7.0",
"graphql-tools": "^4.0.8",
"lodash": "^4.17.20",
"nano": "^8.2.3",
"pouchdb": "^7.2.2",
"pouchdb-adapter-asyncstorage": "^6.4.1",
"pouchdb-adapter-memory": "^7.1.1",
"pouchdb-adapter-memory": "^7.2.2",
"pouchdb-react-native": "^7.0.0-beta-1",
"sofa-api": "^0.5.1",
"uuid": "^3.3.3"
......
......@@ -21,31 +21,39 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import CRDT from "delta-crdts";
import CRDTCodec from "delta-crdts-msgpack-codec";
import { crdtlib } from "@concordant/c-crdtlib";
export default class CRDTWrapper<T> {
public static wrap(object: CRDT, type: string) {
return new CRDTWrapper(CRDTCodec.encode(object.state()), type);
public static wrap<T>(crdt: any): CRDTWrapper<T> {
return new CRDTWrapper(crdt.toJson());
}
// TODO: Need to improve this. Some browsers and Node handle buffers differently. this solution works, but is inefficient.
public static unwrap<T>(object: CRDTWrapper<T>, clientId: string): CRDT<T> {
const unwrapped = CRDT(object.type)(clientId);
unwrapped.apply(
CRDTCodec.decode(Buffer.from(Object.values(object.buffer)))
);
return { unwrapped, type: object.type };
public static unwrap<T>(wrapper: CRDTWrapper<T>): any {
const crdtType = JSON.parse(wrapper.crdtJson)["_type"];
switch (crdtType) {
case "PNCounter":
return crdtlib.crdt.PNCounter.Companion.fromJson(wrapper.crdtJson);
case "MVRegister":
return crdtlib.crdt.MVRegister.Companion.fromJson(wrapper.crdtJson);
case "LWWRegister":
return crdtlib.crdt.LWWRegister.Companion.fromJson(wrapper.crdtJson);
case "Ratchet":
return crdtlib.crdt.Ratchet.Companion.fromJson(wrapper.crdtJson);
case "RGA":
return crdtlib.crdt.RGA.Companion.fromJson(wrapper.crdtJson);
case "LWWMap":
return crdtlib.crdt.LWWMap.Companion.fromJson(wrapper.crdtJson);
case "MVMap":
return crdtlib.crdt.MVMap.Companion.fromJson(wrapper.crdtJson);
case "Map":
return crdtlib.crdt.Map.Companion.fromJson(wrapper.crdtJson);
default:
break;
}
throw new Error("Unknown CRDT type");
}
public static defaultWrappedCRDTFor<T>(
crdtName: string,
clientId: string
): CRDTWrapper<T> {
return CRDTWrapper.wrap(CRDT(crdtName)(clientId), crdtName);
}
constructor(public buffer: Buffer, public type: string) {
this.buffer = buffer;
constructor(public crdtJson: string) {
this.crdtJson = crdtJson;
}
}
......@@ -115,7 +115,7 @@ const resolvers = {
return connection
.get<CRDTWrapper<any>>(r.id)
.then((obj: Document<CRDTWrapper<any>>) => {
const { unwrapped } = CRDTWrapper.unwrap(obj.current(), "");
const { unwrapped } = CRDTWrapper.unwrap(obj.current());
return { id: r.id, document: JSON.stringify(unwrapped.value()) };
})
.catch((error) => console.error("error", error));
......@@ -247,13 +247,13 @@ const hooks: DatabaseHooks = {
obj: Document<CRDTWrapper<any>>,
objs: Array<Document<CRDTWrapper<any>>>
) => {
const { unwrapped: objCRDT } = CRDTWrapper.unwrap(obj.current(), clientId);
const crdt = CRDTWrapper.unwrap(obj.current());
if (objs.length > 0) {
objs.forEach((o) => {
const { unwrapped } = CRDTWrapper.unwrap(o.current(), clientId);
objCRDT.apply(unwrapped.state());
const otherCrdt = CRDTWrapper.unwrap(o.current());
crdt.merge(otherCrdt);
});
return CRDTWrapper.wrap(objCRDT, "ormap");
return CRDTWrapper.wrap(crdt);
}
throw new Error("Unexpected call");
},
......
/**
* MIT License
*
* Copyright (c) 2020, Concordant and contributors
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import CRDT from "delta-crdts";
import CRDTCodec from "delta-crdts-msgpack-codec";
describe("Basic usage", () => {
it("Conflict resolution test", () => {
const type = "rga";
const Type = CRDT(type);
// create 2 replicas
const replicas = [Type("id1"), Type("id2")];
// create concurrent deltas
const deltas: any = [[], []];
replicas[0].push("a");
replicas[0].push("b");
replicas[1].push("c");
replicas[1].push("d");
const joinTwo = replicas[0].join(replicas[0].state(), replicas[1].state());
replicas[0].apply(joinTwo);
replicas[1].apply(joinTwo);
expect(replicas[0].value()).toEqual(replicas[1].value());
});
it("ORMap of ORMaps of registers", () => {
const ORMap = CRDT("ormap");
const MVreg = CRDT("mvreg");
const rs = ORMap("rows");
const reg = MVreg("reg");
reg.write("A");
rs.applySub("r1", "ormap", "applySub", "A", "mvreg", "write", "A");
const delta = rs.state();
const otherReplica = ORMap("other");
otherReplica.apply(CRDTCodec.decode(CRDTCodec.encode(delta)));
expect(rs.value()).toEqual(otherReplica.value());
});
});
......@@ -21,8 +21,7 @@
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
import CRDT from "delta-crdts";
import CRDTCodec from "delta-crdts-msgpack-codec";
import { crdtlib } from "@concordant/c-crdtlib";
import uuid = require("uuid");
import { Document } from "../../src/Database/DataTypes/Interfaces/Types";
import { PouchDB } from "../../src/Database/Implementation/Adapters/PouchDB/Adapter";
......@@ -35,6 +34,7 @@ import {
IBasicConnection,
} from "../../src/Database/Interfaces/Types";
import { promiseDelay } from "../../src/Utils/Utils";
import CRDTWrapper from "../../src/Utils/CRDTWrapper";
import {
dbName,
......@@ -45,28 +45,16 @@ import {
remoteDBurl,
} from "../testParams";
class CRDTWrapper {
public static wrap(object: CRDT, type: string) {
return new CRDTWrapper(CRDTCodec.encode(object.state()), type);
}
public static unwrap(object: CRDTWrapper, clientId: string) {
const unwrapped = CRDT(object.type)(clientId);
unwrapped.apply(CRDTCodec.decode(object.buffer));
return unwrapped;
}
private buffer: any;
constructor(buffer: any, private type: string) {
this.buffer = buffer;
}
}
describe("Basic usage", () => {
let TEST_KEY: string;
let connection1: IBasicConnection;
let connection2: IBasicConnection;
const environment1 = new crdtlib.utils.SimpleEnvironment(
new crdtlib.utils.ClientUId("client1")
);
const environment2 = new crdtlib.utils.SimpleEnvironment(
new crdtlib.utils.ClientUId("client2")
);
const remoteDBs = [remoteDBurl];
beforeAll(() => {
......@@ -104,56 +92,53 @@ describe("Basic usage", () => {
it("Save and update CRDT object", () => {
TEST_KEY = uuid();
const defaultObject = CRDTWrapper.wrap(CRDT("ormap")("client1"), "ormap");
const defaultObject = CRDTWrapper.wrap(new crdtlib.crdt.PNCounter());
return connection2
.get<CRDTWrapper>(TEST_KEY, () => defaultObject)
.then(() => connection2.get<CRDTWrapper>(TEST_KEY))
.then((obj: Document<CRDTWrapper>) => {
const newCRDT = CRDTWrapper.unwrap(obj.current(), "client2");
newCRDT.applySub("r1", "ormap", "applySub", "A", "mvreg", "write", "A");
return obj.update(CRDTWrapper.wrap(newCRDT, "ormap")).save();
.get<CRDTWrapper<any>>(TEST_KEY, () => defaultObject)
.then(() => connection2.get<CRDTWrapper<any>>(TEST_KEY))
.then((obj: Document<CRDTWrapper<any>>) => {
const newCRDT = CRDTWrapper.unwrap(obj.current());
newCRDT.increment(42, environment1.tick());
return obj.update(CRDTWrapper.wrap(newCRDT)).save();
})
.then(() => connection2.get<CRDT>(TEST_KEY))
.then((obj: Document<CRDTWrapper>) => {
const newCRDT = CRDTWrapper.unwrap(obj.current(), "client2");
expect(newCRDT.value().r1).toBeDefined();
.then(() => connection2.get<CRDTWrapper<any>>(TEST_KEY))
.then((obj: Document<CRDTWrapper<any>>) => {
const newCRDT = CRDTWrapper.unwrap(obj.current());
expect(newCRDT.get()).toBe(42);
})
.catch((error) => fail(error));
});
// Based on RemoteConflicts.test.ts
it("conflict handler triggered on get", (done) => {
it("Conflict handler triggered on get", (done) => {
TEST_KEY = uuid();
const client1DefaultObject = CRDT("ormap")("client1");
const client1DefaultObject = new crdtlib.crdt.PNCounter();
// default object is created by client2
const client2DefaultObjectWrapped = CRDTWrapper.wrap(
CRDT("ormap")("client2"),
"ormap"
new crdtlib.crdt.PNCounter()
);
let remoteObj: Document<CRDTWrapper>;
let remoteObj: Document<CRDTWrapper<any>>;
let onlyAfter = false;
// hooks are handled by client2
// need to create hooks for different connections to set different clientIds.
const hooks: DatabaseHooks = {
conflictHandler: (
obj: Document<CRDTWrapper>,
objs: Array<Document<CRDTWrapper>>
obj: Document<CRDTWrapper<any>>,
objs: Array<Document<CRDTWrapper<any>>>
) => {
if (!onlyAfter) {
fail("Unexpected conflict trigger");
}
const objCRDT = CRDTWrapper.unwrap(obj.current(), "client2");
const objCRDT = CRDTWrapper.unwrap(obj.current());
if (objs.length > 0) {
objs.forEach((o) => {
const other = CRDTWrapper.unwrap(o.current(), "client2");
// console.log("Merging", objCRDT.value(), other.value());
objCRDT.apply(other.state());
const other = CRDTWrapper.unwrap(o.current());
objCRDT.merge(other);
});
// console.log("Resulting state", objCRDT.value());
return CRDTWrapper.wrap(objCRDT, "ormap");
return CRDTWrapper.wrap(objCRDT);
}
throw new Error("Unexpected call");
},
......@@ -161,52 +146,35 @@ describe("Basic usage", () => {
connection2.registerHooks(hooks);
const sub = connection1.subscribe<CRDTWrapper>(TEST_KEY, {
const sub = connection1.subscribe<CRDTWrapper<any>>(TEST_KEY, {
change: (key, newObj) => {
connection1.cancel(sub);
onlyAfter = true;
const spreadSheetMap1 = CRDTWrapper.unwrap(newObj.current(), "client1");
spreadSheetMap1.applySub(
"r1",
"ormap",
"applySub",
"A",
"mvreg",
"write",
"A"
);
const crdt = CRDTWrapper.unwrap(newObj.current());
crdt.increment(40, environment2.tick());
newObj
.update(CRDTWrapper.wrap(spreadSheetMap1, "ormap"))
.update(CRDTWrapper.wrap(crdt))
.save()
.then(() => {
client1DefaultObject.applySub(
"r2",
"ormap",
"applySub",
"A",
"mvreg",
"write",
"A"
);
client1DefaultObject.increment(2, environment1.tick());
return remoteObj
.update(CRDTWrapper.wrap(client1DefaultObject, "ormap"))
.update(CRDTWrapper.wrap(client1DefaultObject))
.save()
.catch((err) => fail(err));
})
.then(() => promiseDelay(null, 200))
.then(() => connection2.get<CRDT>(TEST_KEY))
.then(() => connection2.get<CRDTWrapper<any>>(TEST_KEY))
.then((obj) => {
const unwrapped = CRDTWrapper.unwrap(obj.current(), "client1");
expect(unwrapped.value().r1).toBeDefined();
expect(unwrapped.value().r2).toBeDefined();
const unwrapped = CRDTWrapper.unwrap(obj.current());
expect(unwrapped.get()).toBe(42);
})
.then(() => done());
},
});
return connection2
.get<CRDTWrapper>(TEST_KEY, () => client2DefaultObjectWrapped)
.then((obj: Document<CRDTWrapper>) => (remoteObj = obj))
.get<CRDTWrapper<any>>(TEST_KEY, () => client2DefaultObjectWrapped)
.then((obj: Document<CRDTWrapper<any>>) => (remoteObj = obj))
.catch((error) => fail(error));
});
});
......@@ -215,6 +183,12 @@ describe("Test offline support with CRDTs", () => {
const TEST_KEY = uuid();
let connection1: IBasicConnection;
let connection2: IBasicConnection;
const environment1 = new crdtlib.utils.SimpleEnvironment(
new crdtlib.utils.ClientUId("client1")
);
const environment2 = new crdtlib.utils.SimpleEnvironment(
new crdtlib.utils.ClientUId("client2")
);
const remoteDBs = [remoteDBurl];
let originalTimeout: number;
......@@ -264,34 +238,31 @@ describe("Test offline support with CRDTs", () => {
jasmine.DEFAULT_TIMEOUT_INTERVAL = originalTimeout;
});
it("go offline and receive pending updates on reconnect", (done) => {
it("Go offline and receive pending updates on reconnect", (done) => {
jasmine.DEFAULT_TIMEOUT_INTERVAL = 10000;
const client1DefaultObject = CRDT("ormap")("client1");
const client1DefaultObject = new crdtlib.crdt.PNCounter();
const client2DefaultObjectWrapped = CRDTWrapper.wrap(
CRDT("ormap")("client2"),
"ormap"
new crdtlib.crdt.PNCounter()
);
let remoteObj: Document<CRDTWrapper>;
let remoteObj: Document<CRDTWrapper<any>>;
let onlyAfter = false;
const hooks: DatabaseHooks = {
conflictHandler: (
obj: Document<CRDTWrapper>,
objs: Array<Document<CRDTWrapper>>
obj: Document<CRDTWrapper<any>>,
objs: Array<Document<CRDTWrapper<any>>>
) => {
if (!onlyAfter) {
fail("Unexpected conflict trigger");
}
const objCRDT = CRDTWrapper.unwrap(obj.current(), "client2");
const objCRDT = CRDTWrapper.unwrap(obj.current());
if (objs.length > 0) {
objs.forEach((o) => {
const other = CRDTWrapper.unwrap(o.current(), "client2");
// console.log("Merging", objCRDT.value(), other.value());
objCRDT.apply(other.state());
const other = CRDTWrapper.unwrap(o.current());
objCRDT.merge(other);
});
// console.log("Resulting state", objCRDT.value());
return CRDTWrapper.wrap(objCRDT, "ormap");
return CRDTWrapper.wrap(objCRDT);
}
throw new Error("Unexpected call");
},
......@@ -299,53 +270,36 @@ describe("Test offline support with CRDTs", () => {
connection2.registerHooks(hooks);
const sub = connection1.subscribe<CRDTWrapper>(TEST_KEY, {
const sub = connection1.subscribe<CRDTWrapper<any>>(TEST_KEY, {
change: (key, newObj) => {
connection1.cancel(sub);
onlyAfter = true;
const spreadSheetMap1 = CRDTWrapper.unwrap(newObj.current(), "client1");
spreadSheetMap1.applySub(
"r1",
"ormap",
"applySub",
"A",
"mvreg",
"write",
"A"
);
const crdt = CRDTWrapper.unwrap(newObj.current());
crdt.increment(40, environment2.tick());
newObj
.update(CRDTWrapper.wrap(spreadSheetMap1, "ormap"))
.update(CRDTWrapper.wrap(crdt))
.save()
.then(() => {
client1DefaultObject.applySub(
"r2",
"ormap",
"applySub",
"A",
"mvreg",
"write",
"A"
);
client1DefaultObject.increment(2, environment1.tick());
return remoteObj
.update(CRDTWrapper.wrap(client1DefaultObject, "ormap"))
.update(CRDTWrapper.wrap(client1DefaultObject))
.save()
.catch((err) => fail(err));
})
.then(() => promiseDelay(null, 200))
// .then(() => connection2.goOnline())
.then(() => connection2.get<CRDT>(TEST_KEY))
.then(() => connection2.get<CRDTWrapper<any>>(TEST_KEY))
.then((obj) => {
const unwrapped = CRDTWrapper.unwrap(obj.current(), "client1");
expect(unwrapped.value().r1).toBeDefined();
expect(unwrapped.value().r2).toBeDefined();
const unwrapped = CRDTWrapper.unwrap(obj.current());
expect(unwrapped.get()).toBe(42);
})
.then(() => done());
},
});
return connection2
.get<CRDTWrapper>(TEST_KEY, () => client2DefaultObjectWrapped)
.then((obj: Document<CRDTWrapper>) => (remoteObj = obj))
.get<CRDTWrapper<any>>(TEST_KEY, () => client2DefaultObjectWrapped)
.then((obj: Document<CRDTWrapper<any>>) => (remoteObj = obj))
.then(() => connection2.goOffline())
.catch((error) => fail(error));
});
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment