From 5949c6f6c2bc4a0829376b89cac3d0626e7d11f6 Mon Sep 17 00:00:00 2001 From: Ludovic Le Frioux <llefriou@po461-pro.paris.inria.fr> Date: Mon, 28 Sep 2020 15:19:22 +0200 Subject: [PATCH] Update pseudo code after discussion with @fcoriat --- CObject | 21 ++++-- ClientId | 1 - Collection | 27 ++++--- CollectionId | 1 - ConsistencyLevel | 6 -- Global | 11 --- OpId | 1 - ServiceHandler | 2 +- Session | 75 +++++++++++++------ Transaction | 19 ++--- TxnId | 1 - CObjectId => utils/CObjectId | 1 + utils/ClientId | 2 + utils/CollectionId | 2 + utils/ConsistencyLevel | 7 ++ utils/Global | 2 + .../NotificationHandler | 1 + utils/OperationId | 2 + TransactionBody => utils/TransactionBody | 1 + utils/TxnId | 2 + 20 files changed, 110 insertions(+), 75 deletions(-) delete mode 100644 ClientId delete mode 100644 CollectionId delete mode 100644 ConsistencyLevel delete mode 100644 Global delete mode 100644 OpId delete mode 100644 TxnId rename CObjectId => utils/CObjectId (53%) create mode 100644 utils/ClientId create mode 100644 utils/CollectionId create mode 100644 utils/ConsistencyLevel create mode 100644 utils/Global rename NotificationHandler => utils/NotificationHandler (83%) create mode 100644 utils/OperationId rename TransactionBody => utils/TransactionBody (76%) create mode 100644 utils/TxnId diff --git a/CObject b/CObject index b76963f..6f43467 100644 --- a/CObject +++ b/CObject @@ -1,16 +1,21 @@ class CObject<T> { + // Concordant object id private id: CObjectId + // Is openned in read only mode private readOnly: Boolean + // The encapsulated CRDT private crdt: DeltaCRDT<T> + // Backup for the necapsulated CRDT (used in case of transaction abort) private txnBackup: DeltaCRDT<T>? CObject(oid: CObjectId, crdt: DeltaCRDT<T>, readOnly: Boolean) { this.id = oid this.crdt = crdt + this.readOnly = readOnly this.txnBackup = null } @@ -24,20 +29,26 @@ class CObject<T> { } fun update(args) { - if (NOT Global.DirtyCObjects.contains(this)) { - Global.DirtyCObjects.put(this.id, this) + if (Session.currentTransaction == null) throw Exception + if (this.readOnly) throw Exception + if (NOT ActiveSession.dirtyObjects.contains(this.id)) { + ActiveSession.dirtyObjects.put(this.id, this) this.txnBackup = this.crdt.copy() } - ts = Global.TxnEnv.tick() - crdt.update(args, ts) + // Should be integrrated to CRDTs + ts = ActiveSession.txnEnv.tick() + this.crdt.update(args, ts) } fun getter(args): val { + if (ActiveSession.currentTransaction == null) throw Exception return crdt.getter(args) } fun close() { - Global.Cache.remove(this.id) + if (NOT ActiveSession.currentTransaction == null) throw Exception + ActiveSession.cache.remove(this.id) + ActiveSession.handlers.remove(this.id) SERVICE.unsubscribe(this.id) } } diff --git a/ClientId b/ClientId deleted file mode 100644 index d73e248..0000000 --- a/ClientId +++ /dev/null @@ -1 +0,0 @@ -type ClientId = UUID diff --git a/Collection b/Collection index 69c99fa..c6939da 100644 --- a/Collection +++ b/Collection @@ -9,32 +9,35 @@ class Collection { // Static boolean used to ensure that only one collection is openned private static otherOpenned: Boolean = false - Collection(cid: CollectionId) { - if (otherOpenned == true) throw Exception + Collection(cid: CollectionId, readOnly: Boolean) { + if (Collection.otherOpenned == true) throw Exception Collection.otherOpenned = true this.id = cid + this.readOnly = readOnly } // c_open_read|write<T> fun open<T>(oid: String, readOnly: Boolean, handler: NotificationHandler): T { if (this.readOnly AND NOT readOnly) throw Exception + if (NOT Session.currentTransaction == null) throw Exception + oid = CObjectId(this.id, oid, T) - if (Global.Cache.contains(oid)) return Global.Cache.get(oid) - crdt, _ = SERVICE.getObject(oid, TxnEnv.getState()) - SERVICE.subscribe(oid) + if (ActiveSession.cache.contains(oid)) return ActiveSession.cache.get(oid) + + crdt, v = SERVICE.getObject(oid, ActiveSession.txnEnv.getState()) + ActiveSession.txnEnv.updateSate(v) obj = CObject<T>(oid, crdt, readOnly) - Global.Cache.put(oid, obj) - Global.Handlers.put(oid, handler) + ActiveSession.cache.put(oid, obj) + ActiveSession.handlers.put(oid, handler) return obj } // c_close_collection fun close() { - for oid in Global.Cache { - SERVICE.unsubscribe(oid) - } - Global.Cache.clear() - Global.Handlers.clear() + if (NOT Session.currentTransaction == null) throw Exception + SERVICE.unsubscribeAll() + ActiveSession.cache.clear() + ActiveSession.handlers.clear() Collection.otherOpenned = false } } diff --git a/CollectionId b/CollectionId deleted file mode 100644 index a332d25..0000000 --- a/CollectionId +++ /dev/null @@ -1 +0,0 @@ -type CollectionId = String diff --git a/ConsistencyLevel b/ConsistencyLevel deleted file mode 100644 index acd920f..0000000 --- a/ConsistencyLevel +++ /dev/null @@ -1,6 +0,0 @@ -enum ConsistencyLevel { - - RC, - Snapshot, - Strong -} diff --git a/Global b/Global deleted file mode 100644 index 7d3cff3..0000000 --- a/Global +++ /dev/null @@ -1,11 +0,0 @@ -// Transaction environment should inherit from crdtlib.utils.Environment -TxnEnv: TransactionEnvironment - -// Map for dirty objects -DirtyCObjects: Map<CObjectId, CObject> - -// Map for the cache -Cache: Map<CObjectId, CObject> - -// Map for c-object handlers management -Handlers: Map<CObjectId, NotificationHandler> diff --git a/OpId b/OpId deleted file mode 100644 index 9d43b68..0000000 --- a/OpId +++ /dev/null @@ -1 +0,0 @@ -type OpId = (TxnId, Int) diff --git a/ServiceHandler b/ServiceHandler index 120db22..932a384 100644 --- a/ServiceHandler +++ b/ServiceHandler @@ -8,7 +8,7 @@ class ServiceHandler: Thread { fun work() { while (true) { v, oid = readMessage() - if (Global.Handlers.contains(oid)) { + if (ActiveSession.handlers.contains(oid)) { // call the corresponding handler Handlers.get(oid)(v, oid) } diff --git a/Session b/Session index 57a8642..12a7506 100644 --- a/Session +++ b/Session @@ -1,64 +1,93 @@ class Session { - // Current consistency level - private consistency: ConsistencyLevel - // The client id private cliendId: CliendId + // Current consistency level + private consistency: ConsistencyLevel? + // The attached service handler private serviceHandler: ServiceHandler - Session(cid: ClientId) { + // Current running transaction + private currentTransaction: Transaction? + + // Transaction environment should inherit from crdtlib.utils.Environment + package txnEnv: TransactionEnvironment + + // Map for dirty Concordant objects + package dirtyObjects: Map<CObjectId, CObject> + + // Map for the cache of Concordant objects + package cache: Map<CObjectId, CObject> + + // Map for Concordant object handlers management + package handlers: Map<CObjectId, NotificationHandler> + + private Session(cid: ClientId) { this.clientId = cid + this.txnEnv = TransactionEnvironment(this.clientId) + this.dirtyObjects = mutableMapOf() + this.cache = mutableMapOf() + this.handlers = mutableMapOf() + this.serviceHandler = ServiceHandler() + this.serviceHandler.run() + this.currentTransaction = null } // c_begin_session - static fun open(dbName: String, credentials: String): Session { + static fun connect(dbName: String, credentials: String): Session { + if (NOT ActiveSession == null) throw Exception clientId = ClientId() - if (NOT SERVICE.open(dbName, credentials, clientId)) throw Exception - this.serviceHandler = ServiceHandler() - this.serviceHandler.run() - return Session(clientId) + if (NOT SERVICE.connect(dbName, credentials, clientId)) throw Exception + ActiveSession = Session(clientId) + retur ActiveSession } // c_pull_XX_view fun pull(type: ConsistencyLevel) { - if (consistency != null AND consistency > type) throw Exception + if (NOT this.currentTransaction == null) throw Exception + if (this.consistency != null AND this.consistency > type) throw Exception this.consistency = type - for oid in Cache { - crdt, v = SERVICE.getObject(oid) - obj = Global.Cache.get(oid) + for oid in this.cache { + crdt, v = SERVICE.getObject(oid, this.consistency) + obj = this.cache.get(oid) obj.crdt = crdt - Global.TxnEnv.updateSate(v) + this.txnEnv.updateSate(v) } } // c_pull_XX_view(v) fun pull(type: ConsistencyLevel, vv: VersionVector) { - if (view != null AND view.type > type) throw Exception - if (vv < global.TxnEnv.getState()) throw Exception - for oid in Cache { - crdt, v = SERVICE.getObject(oid, vv) - obj = Global.Cache.get(oid) + if (NOT this.currentTransaction == null) throw Exception + if (this.consistency != null AND this.consistency > type) throw Exception + if (vv < this.txnEnv.getState()) throw Exception + this.consistency = type + for oid in this.cache { + crdt, v = SERVICE.getObject(oid, vv, this.consistency) + obj = this.cache.get(oid) obj.crdt = crdt - Global.TxnEnv.updateSate(v) + this.txnEnv.updateSate(v) } } // c_open_collection_read|write fun openCollection(cid: CollectionId, readOnly: Boolean): Collection { - return Collection(cid) + if (NOT this.currentTransaction == null) throw Exception + return Collection(cid, readOnly) } // c_XX_txn - fun beginTransaction(body: () -> Unit) { - Transaction(body) + fun transaction(body: TransactionBody) { + if (NOT this.currentTransaction == null) throw Exception + this.currentTransaction = Transaction(body) } // c_end_session fun close() { + if (this.currentTransaction == null) this.currentTransaction.abort() SERVICE.close(clientId) this.serviceHandler.stop() + ActiveSession = null } } diff --git a/Transaction b/Transaction index a30bacf..ffe0d61 100644 --- a/Transaction +++ b/Transaction @@ -3,17 +3,12 @@ class Transaction { // Transaction id private transactionId: TxnId - // Static boolean used to ensure that only one collection is openned - private static otherOpenned: Boolean = false - // The version vector at which the transaction has started private begin: VersionVector Transaction(tid: TxnId, body: TransactionBody) { - if (Transaction.otherOpenned) throw Exception - Transaction.otherOpenned = true - this.begin = Global.TxnEnv.getState() - this.transactionId = Global.TxnEnv.tickTransaction() + this.begin = ActiveSession.txnEnv.getState() + this.transactionId = ActiveSession.txnEnv.tickTransaction() try { // call the body function @@ -28,22 +23,20 @@ class Transaction { // c_abort_txn fun abort() { - for obj in Global.DirtyCObjects { + for obj in ActiveSession.dirtyObjects { obj.txnAbort() } - Global.DirtyCObjects.clear() - Transaction.otherOpenned = false + ActiveSession.dirtyObjects.clear() } // c_commit_txn fun commit() { SERVICE.beginTransaction(this.transactionId) - for obj in Global.DirtyCObjects { + for obj in ActiveSession.dirtyObjects { SERVICE.pushObject(obj.crdt.getDelta(this.begin)) obj.txnCommit() } SERVICE.commitTransaction(this.transactionId) - Global.DirtyCObjects.clear() - Transaction.otherOpenned = false + ActiveSession.dirtyObjects.clear() } } diff --git a/TxnId b/TxnId deleted file mode 100644 index c64b3eb..0000000 --- a/TxnId +++ /dev/null @@ -1 +0,0 @@ -type TxnId = (Int, ClientId) diff --git a/CObjectId b/utils/CObjectId similarity index 53% rename from CObjectId rename to utils/CObjectId index e75632f..8bde73e 100644 --- a/CObjectId +++ b/utils/CObjectId @@ -1 +1,2 @@ +// Concordant object unique identifiers type CObjectId = (CollectionId, String, Type) diff --git a/utils/ClientId b/utils/ClientId new file mode 100644 index 0000000..a591f0b --- /dev/null +++ b/utils/ClientId @@ -0,0 +1,2 @@ +// Client unique identifiers are UUId, they can also be hierarchic +type ClientId = UUId diff --git a/utils/CollectionId b/utils/CollectionId new file mode 100644 index 0000000..330f9c2 --- /dev/null +++ b/utils/CollectionId @@ -0,0 +1,2 @@ +// Collection unique identifiers +type CollectionId = String diff --git a/utils/ConsistencyLevel b/utils/ConsistencyLevel new file mode 100644 index 0000000..7caa60f --- /dev/null +++ b/utils/ConsistencyLevel @@ -0,0 +1,7 @@ +// Different types of consistency level provided by the Concordant platform +enum ConsistencyLevel { + + RC, + Snapshot, + Strong +} diff --git a/utils/Global b/utils/Global new file mode 100644 index 0000000..475f16d --- /dev/null +++ b/utils/Global @@ -0,0 +1,2 @@ +// Global variable(s) +ActiveSession: Session? = null diff --git a/NotificationHandler b/utils/NotificationHandler similarity index 83% rename from NotificationHandler rename to utils/NotificationHandler index 3f95a44..d905e70 100644 --- a/NotificationHandler +++ b/utils/NotificationHandler @@ -1,2 +1,3 @@ // The notification handler (function) type accepted for c-objects +// Unit == void in Kotlin type NotificationHandler = (VersionVector, CObjectId) -> Unit diff --git a/utils/OperationId b/utils/OperationId new file mode 100644 index 0000000..6b954f5 --- /dev/null +++ b/utils/OperationId @@ -0,0 +1,2 @@ +// Operations unique identifiers +type OperationId = (TxnId, Int) diff --git a/TransactionBody b/utils/TransactionBody similarity index 76% rename from TransactionBody rename to utils/TransactionBody index cb39263..6c6e44b 100644 --- a/TransactionBody +++ b/utils/TransactionBody @@ -1,2 +1,3 @@ // The function type accepted for a transaction body +// Unit == void in Kotlin type TransactionBody = () -> Unit diff --git a/utils/TxnId b/utils/TxnId new file mode 100644 index 0000000..f262855 --- /dev/null +++ b/utils/TxnId @@ -0,0 +1,2 @@ +// Transaction unique identifiers +type TxnId = (Int, ClientId) -- GitLab