diff --git a/Cache b/Cache deleted file mode 100644 index 2e8fcb40c94098bd6ba0a4efb5a963d0619aa6c4..0000000000000000000000000000000000000000 --- a/Cache +++ /dev/null @@ -1,12 +0,0 @@ -class Cache { - - private cobjects: Map<CObjectId, CObject> - - fun getObject(oid: CObjectId) { - return cobjects.get(oid) - } - - fun addObject(oid: CObjectId, obj: CObject) { - this.cobjects.put(oid. obj) - } -} diff --git a/Collection b/Collection index dca1d1321149ea720273819f59ce31d3deb82191..69c99fa9de42c4c9e73773446c023ef37ac591eb 100644 --- a/Collection +++ b/Collection @@ -1,9 +1,12 @@ class Collection { + // The collection id private id: CollectionId + // Is the collection open in read only mode private readOnly: Boolean + // Static boolean used to ensure that only one collection is openned private static otherOpenned: Boolean = false Collection(cid: CollectionId) { @@ -12,21 +15,26 @@ class Collection { this.id = cid } - // c_open_read|write<T> handler? - // TODO: manage notifications - fun open<T>(oid: String, readOnly: Boolean, handler: (VersionVector, List<CObjectId>) -> Unit = {}): T { + // c_open_read|write<T> + fun open<T>(oid: String, readOnly: Boolean, handler: NotificationHandler): T { + if (this.readOnly AND NOT readOnly) throw Exception oid = CObjectId(this.id, oid, T) if (Global.Cache.contains(oid)) return Global.Cache.get(oid) - crdt = SERVICE.getObject(oid, TxnEnv.getState()) // SERVICE.subscribe(oid) - if (this.readOnly AND NOT readOnly) throw Exception - return CObject<T>(oid, crdt, readOnly) + crdt, _ = SERVICE.getObject(oid, TxnEnv.getState()) + SERVICE.subscribe(oid) + obj = CObject<T>(oid, crdt, readOnly) + Global.Cache.put(oid, obj) + Global.Handlers.put(oid, handler) + return obj } + // c_close_collection fun close() { - for obj in Global.Cache { - SERVICE.unsubscribe(obj.id) + for oid in Global.Cache { + SERVICE.unsubscribe(oid) } Global.Cache.clear() + Global.Handlers.clear() Collection.otherOpenned = false } } diff --git a/Global b/Global index d922dc13f77afa08a9ae0fdf01aa0c9eef6e7b63..7d3cff3a373f16d5d2eee121e97e0c8bf7b1952f 100644 --- a/Global +++ b/Global @@ -1,5 +1,11 @@ +// Transaction environment should inherit from crdtlib.utils.Environment TxnEnv: TransactionEnvironment +// Map for dirty objects DirtyCObjects: Map<CObjectId, CObject> +// Map for the cache Cache: Map<CObjectId, CObject> + +// Map for c-object handlers management +Handlers: Map<CObjectId, NotificationHandler> diff --git a/NotificationHandler b/NotificationHandler new file mode 100644 index 0000000000000000000000000000000000000000..3f95a44cb9ddb4d6825b72e8c0268d60764511f8 --- /dev/null +++ b/NotificationHandler @@ -0,0 +1,2 @@ +// The notification handler (function) type accepted for c-objects +type NotificationHandler = (VersionVector, CObjectId) -> Unit diff --git a/ServiceHandler b/ServiceHandler new file mode 100644 index 0000000000000000000000000000000000000000..120db22a3e514cfb2c14b8f4a02ab520ab3cf4c7 --- /dev/null +++ b/ServiceHandler @@ -0,0 +1,21 @@ +// Thread that will listen for server notifications +class ServiceHandler: Thread { + + fun run() { + this.run(work) + } + + fun work() { + while (true) { + v, oid = readMessage() + if (Global.Handlers.contains(oid)) { + // call the corresponding handler + Handlers.get(oid)(v, oid) + } + } + } + + fun stop() { + exit(this) + } +} diff --git a/Session b/Session index 04a70765188535a1026e2043d963584bbe826743..57a8642a5f99e9bb2b50cc82795d9aef2b19084c 100644 --- a/Session +++ b/Session @@ -1,9 +1,14 @@ class Session { + // Current consistency level private consistency: ConsistencyLevel + // The client id private cliendId: CliendId + // The attached service handler + private serviceHandler: ServiceHandler + Session(cid: ClientId) { this.clientId = cid } @@ -12,6 +17,8 @@ class Session { static fun open(dbName: String, credentials: String): Session { clientId = ClientId() if (NOT SERVICE.open(dbName, credentials, clientId)) throw Exception + this.serviceHandler = ServiceHandler() + this.serviceHandler.run() return Session(clientId) } @@ -19,16 +26,24 @@ class Session { fun pull(type: ConsistencyLevel) { if (consistency != null AND consistency > type) throw Exception this.consistency = type - v = SERVICE.pull(type) - Global.TxnEnv.updateState(v) + for oid in Cache { + crdt, v = SERVICE.getObject(oid) + obj = Global.Cache.get(oid) + obj.crdt = crdt + Global.TxnEnv.updateSate(v) + } } // c_pull_XX_view(v) fun pull(type: ConsistencyLevel, vv: VersionVector) { if (view != null AND view.type > type) throw Exception if (vv < global.TxnEnv.getState()) throw Exception - v = SERVICE.pull(type, vv) - Global.TxnEnv.updateState(v) + for oid in Cache { + crdt, v = SERVICE.getObject(oid, vv) + obj = Global.Cache.get(oid) + obj.crdt = crdt + Global.TxnEnv.updateSate(v) + } } // c_open_collection_read|write @@ -44,5 +59,6 @@ class Session { // c_end_session fun close() { SERVICE.close(clientId) + this.serviceHandler.stop() } } diff --git a/Transaction b/Transaction index 0bc5a0c0026e18ebfe710d769e8c01d02f2f860d..a30bacfa1935b8e3a92167b699d5c85384a3db5d 100644 --- a/Transaction +++ b/Transaction @@ -1,21 +1,27 @@ class Transaction { - private transactionId: String + // Transaction id + private transactionId: TxnId + // Static boolean used to ensure that only one collection is openned private static otherOpenned: Boolean = false + // The version vector at which the transaction has started private begin: VersionVector - Transaction(tid: TxnId, body: () -> Unit) { + Transaction(tid: TxnId, body: TransactionBody) { if (Transaction.otherOpenned) throw Exception Transaction.otherOpenned = true this.begin = Global.TxnEnv.getState() this.transactionId = Global.TxnEnv.tickTransaction() try { + // call the body function body() + // if everything goes well commit this.commit() } catch { + // If there is an exception abort this.abort() } } @@ -33,7 +39,7 @@ class Transaction { fun commit() { SERVICE.beginTransaction(this.transactionId) for obj in Global.DirtyCObjects { - SERVICE.updateObject(obj.crdt.getDelta(this.begin)) + SERVICE.pushObject(obj.crdt.getDelta(this.begin)) obj.txnCommit() } SERVICE.commitTransaction(this.transactionId) diff --git a/TransactionBody b/TransactionBody new file mode 100644 index 0000000000000000000000000000000000000000..cb392637aa74f0c83c718f62d9b452b3056870e1 --- /dev/null +++ b/TransactionBody @@ -0,0 +1,2 @@ +// The function type accepted for a transaction body +type TransactionBody = () -> Unit