Mentions légales du service

Skip to content
Snippets Groups Projects
Commit 5949c6f6 authored by Ludovic Le Frioux's avatar Ludovic Le Frioux
Browse files

Update pseudo code after discussion with @fcoriat

parent 8c0499f2
No related branches found
No related tags found
No related merge requests found
class CObject<T> { class CObject<T> {
// Concordant object id
private id: CObjectId private id: CObjectId
// Is openned in read only mode
private readOnly: Boolean private readOnly: Boolean
// The encapsulated CRDT
private crdt: DeltaCRDT<T> private crdt: DeltaCRDT<T>
// Backup for the necapsulated CRDT (used in case of transaction abort)
private txnBackup: DeltaCRDT<T>? private txnBackup: DeltaCRDT<T>?
CObject(oid: CObjectId, crdt: DeltaCRDT<T>, readOnly: Boolean) { CObject(oid: CObjectId, crdt: DeltaCRDT<T>, readOnly: Boolean) {
this.id = oid this.id = oid
this.crdt = crdt this.crdt = crdt
this.readOnly = readOnly
this.txnBackup = null this.txnBackup = null
} }
...@@ -24,20 +29,26 @@ class CObject<T> { ...@@ -24,20 +29,26 @@ class CObject<T> {
} }
fun update(args) { fun update(args) {
if (NOT Global.DirtyCObjects.contains(this)) { if (Session.currentTransaction == null) throw Exception
Global.DirtyCObjects.put(this.id, this) if (this.readOnly) throw Exception
if (NOT ActiveSession.dirtyObjects.contains(this.id)) {
ActiveSession.dirtyObjects.put(this.id, this)
this.txnBackup = this.crdt.copy() this.txnBackup = this.crdt.copy()
} }
ts = Global.TxnEnv.tick() // Should be integrrated to CRDTs
crdt.update(args, ts) ts = ActiveSession.txnEnv.tick()
this.crdt.update(args, ts)
} }
fun getter(args): val { fun getter(args): val {
if (ActiveSession.currentTransaction == null) throw Exception
return crdt.getter(args) return crdt.getter(args)
} }
fun close() { fun close() {
Global.Cache.remove(this.id) if (NOT ActiveSession.currentTransaction == null) throw Exception
ActiveSession.cache.remove(this.id)
ActiveSession.handlers.remove(this.id)
SERVICE.unsubscribe(this.id) SERVICE.unsubscribe(this.id)
} }
} }
type ClientId = UUID
...@@ -9,32 +9,35 @@ class Collection { ...@@ -9,32 +9,35 @@ class Collection {
// Static boolean used to ensure that only one collection is openned // Static boolean used to ensure that only one collection is openned
private static otherOpenned: Boolean = false private static otherOpenned: Boolean = false
Collection(cid: CollectionId) { Collection(cid: CollectionId, readOnly: Boolean) {
if (otherOpenned == true) throw Exception if (Collection.otherOpenned == true) throw Exception
Collection.otherOpenned = true Collection.otherOpenned = true
this.id = cid this.id = cid
this.readOnly = readOnly
} }
// c_open_read|write<T> // c_open_read|write<T>
fun open<T>(oid: String, readOnly: Boolean, handler: NotificationHandler): T { fun open<T>(oid: String, readOnly: Boolean, handler: NotificationHandler): T {
if (this.readOnly AND NOT readOnly) throw Exception if (this.readOnly AND NOT readOnly) throw Exception
if (NOT Session.currentTransaction == null) throw Exception
oid = CObjectId(this.id, oid, T) oid = CObjectId(this.id, oid, T)
if (Global.Cache.contains(oid)) return Global.Cache.get(oid) if (ActiveSession.cache.contains(oid)) return ActiveSession.cache.get(oid)
crdt, _ = SERVICE.getObject(oid, TxnEnv.getState())
SERVICE.subscribe(oid) crdt, v = SERVICE.getObject(oid, ActiveSession.txnEnv.getState())
ActiveSession.txnEnv.updateSate(v)
obj = CObject<T>(oid, crdt, readOnly) obj = CObject<T>(oid, crdt, readOnly)
Global.Cache.put(oid, obj) ActiveSession.cache.put(oid, obj)
Global.Handlers.put(oid, handler) ActiveSession.handlers.put(oid, handler)
return obj return obj
} }
// c_close_collection // c_close_collection
fun close() { fun close() {
for oid in Global.Cache { if (NOT Session.currentTransaction == null) throw Exception
SERVICE.unsubscribe(oid) SERVICE.unsubscribeAll()
} ActiveSession.cache.clear()
Global.Cache.clear() ActiveSession.handlers.clear()
Global.Handlers.clear()
Collection.otherOpenned = false Collection.otherOpenned = false
} }
} }
// Transaction environment should inherit from crdtlib.utils.Environment
TxnEnv: TransactionEnvironment
// Map for dirty objects
DirtyCObjects: Map<CObjectId, CObject>
// Map for the cache
Cache: Map<CObjectId, CObject>
// Map for c-object handlers management
Handlers: Map<CObjectId, NotificationHandler>
type OpId = (TxnId, Int)
...@@ -8,7 +8,7 @@ class ServiceHandler: Thread { ...@@ -8,7 +8,7 @@ class ServiceHandler: Thread {
fun work() { fun work() {
while (true) { while (true) {
v, oid = readMessage() v, oid = readMessage()
if (Global.Handlers.contains(oid)) { if (ActiveSession.handlers.contains(oid)) {
// call the corresponding handler // call the corresponding handler
Handlers.get(oid)(v, oid) Handlers.get(oid)(v, oid)
} }
......
class Session { class Session {
// Current consistency level
private consistency: ConsistencyLevel
// The client id // The client id
private cliendId: CliendId private cliendId: CliendId
// Current consistency level
private consistency: ConsistencyLevel?
// The attached service handler // The attached service handler
private serviceHandler: ServiceHandler private serviceHandler: ServiceHandler
Session(cid: ClientId) { // Current running transaction
private currentTransaction: Transaction?
// Transaction environment should inherit from crdtlib.utils.Environment
package txnEnv: TransactionEnvironment
// Map for dirty Concordant objects
package dirtyObjects: Map<CObjectId, CObject>
// Map for the cache of Concordant objects
package cache: Map<CObjectId, CObject>
// Map for Concordant object handlers management
package handlers: Map<CObjectId, NotificationHandler>
private Session(cid: ClientId) {
this.clientId = cid this.clientId = cid
this.txnEnv = TransactionEnvironment(this.clientId)
this.dirtyObjects = mutableMapOf()
this.cache = mutableMapOf()
this.handlers = mutableMapOf()
this.serviceHandler = ServiceHandler()
this.serviceHandler.run()
this.currentTransaction = null
} }
// c_begin_session // c_begin_session
static fun open(dbName: String, credentials: String): Session { static fun connect(dbName: String, credentials: String): Session {
if (NOT ActiveSession == null) throw Exception
clientId = ClientId() clientId = ClientId()
if (NOT SERVICE.open(dbName, credentials, clientId)) throw Exception if (NOT SERVICE.connect(dbName, credentials, clientId)) throw Exception
this.serviceHandler = ServiceHandler() ActiveSession = Session(clientId)
this.serviceHandler.run() retur ActiveSession
return Session(clientId)
} }
// c_pull_XX_view // c_pull_XX_view
fun pull(type: ConsistencyLevel) { fun pull(type: ConsistencyLevel) {
if (consistency != null AND consistency > type) throw Exception if (NOT this.currentTransaction == null) throw Exception
if (this.consistency != null AND this.consistency > type) throw Exception
this.consistency = type this.consistency = type
for oid in Cache { for oid in this.cache {
crdt, v = SERVICE.getObject(oid) crdt, v = SERVICE.getObject(oid, this.consistency)
obj = Global.Cache.get(oid) obj = this.cache.get(oid)
obj.crdt = crdt obj.crdt = crdt
Global.TxnEnv.updateSate(v) this.txnEnv.updateSate(v)
} }
} }
// c_pull_XX_view(v) // c_pull_XX_view(v)
fun pull(type: ConsistencyLevel, vv: VersionVector) { fun pull(type: ConsistencyLevel, vv: VersionVector) {
if (view != null AND view.type > type) throw Exception if (NOT this.currentTransaction == null) throw Exception
if (vv < global.TxnEnv.getState()) throw Exception if (this.consistency != null AND this.consistency > type) throw Exception
for oid in Cache { if (vv < this.txnEnv.getState()) throw Exception
crdt, v = SERVICE.getObject(oid, vv) this.consistency = type
obj = Global.Cache.get(oid) for oid in this.cache {
crdt, v = SERVICE.getObject(oid, vv, this.consistency)
obj = this.cache.get(oid)
obj.crdt = crdt obj.crdt = crdt
Global.TxnEnv.updateSate(v) this.txnEnv.updateSate(v)
} }
} }
// c_open_collection_read|write // c_open_collection_read|write
fun openCollection(cid: CollectionId, readOnly: Boolean): Collection { fun openCollection(cid: CollectionId, readOnly: Boolean): Collection {
return Collection(cid) if (NOT this.currentTransaction == null) throw Exception
return Collection(cid, readOnly)
} }
// c_XX_txn // c_XX_txn
fun beginTransaction(body: () -> Unit) { fun transaction(body: TransactionBody) {
Transaction(body) if (NOT this.currentTransaction == null) throw Exception
this.currentTransaction = Transaction(body)
} }
// c_end_session // c_end_session
fun close() { fun close() {
if (this.currentTransaction == null) this.currentTransaction.abort()
SERVICE.close(clientId) SERVICE.close(clientId)
this.serviceHandler.stop() this.serviceHandler.stop()
ActiveSession = null
} }
} }
...@@ -3,17 +3,12 @@ class Transaction { ...@@ -3,17 +3,12 @@ class Transaction {
// Transaction id // Transaction id
private transactionId: TxnId private transactionId: TxnId
// Static boolean used to ensure that only one collection is openned
private static otherOpenned: Boolean = false
// The version vector at which the transaction has started // The version vector at which the transaction has started
private begin: VersionVector private begin: VersionVector
Transaction(tid: TxnId, body: TransactionBody) { Transaction(tid: TxnId, body: TransactionBody) {
if (Transaction.otherOpenned) throw Exception this.begin = ActiveSession.txnEnv.getState()
Transaction.otherOpenned = true this.transactionId = ActiveSession.txnEnv.tickTransaction()
this.begin = Global.TxnEnv.getState()
this.transactionId = Global.TxnEnv.tickTransaction()
try { try {
// call the body function // call the body function
...@@ -28,22 +23,20 @@ class Transaction { ...@@ -28,22 +23,20 @@ class Transaction {
// c_abort_txn // c_abort_txn
fun abort() { fun abort() {
for obj in Global.DirtyCObjects { for obj in ActiveSession.dirtyObjects {
obj.txnAbort() obj.txnAbort()
} }
Global.DirtyCObjects.clear() ActiveSession.dirtyObjects.clear()
Transaction.otherOpenned = false
} }
// c_commit_txn // c_commit_txn
fun commit() { fun commit() {
SERVICE.beginTransaction(this.transactionId) SERVICE.beginTransaction(this.transactionId)
for obj in Global.DirtyCObjects { for obj in ActiveSession.dirtyObjects {
SERVICE.pushObject(obj.crdt.getDelta(this.begin)) SERVICE.pushObject(obj.crdt.getDelta(this.begin))
obj.txnCommit() obj.txnCommit()
} }
SERVICE.commitTransaction(this.transactionId) SERVICE.commitTransaction(this.transactionId)
Global.DirtyCObjects.clear() ActiveSession.dirtyObjects.clear()
Transaction.otherOpenned = false
} }
} }
// Concordant object unique identifiers
type CObjectId = (CollectionId, String, Type) type CObjectId = (CollectionId, String, Type)
// Client unique identifiers are UUId, they can also be hierarchic
type ClientId = UUId
// Collection unique identifiers
type CollectionId = String type CollectionId = String
// Different types of consistency level provided by the Concordant platform
enum ConsistencyLevel { enum ConsistencyLevel {
RC, RC,
Snapshot, Snapshot,
Strong Strong
......
// Global variable(s)
ActiveSession: Session? = null
// The notification handler (function) type accepted for c-objects // The notification handler (function) type accepted for c-objects
// Unit == void in Kotlin
type NotificationHandler = (VersionVector, CObjectId) -> Unit type NotificationHandler = (VersionVector, CObjectId) -> Unit
// Operations unique identifiers
type OperationId = (TxnId, Int)
// The function type accepted for a transaction body // The function type accepted for a transaction body
// Unit == void in Kotlin
type TransactionBody = () -> Unit type TransactionBody = () -> Unit
// Transaction unique identifiers
type TxnId = (Int, ClientId) type TxnId = (Int, ClientId)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment