Smart Contract
MoxyProcessQueue
A.123cb47fe122f6e3.MoxyProcessQueue
1import MoxyData from 0x123cb47fe122f6e3
2
3
4pub contract MoxyProcessQueue {
5 pub event MoxyQueueSelected(queueNumber: Int)
6 pub event RunCompleted(processed: Int)
7
8 pub resource Run {
9 pub var runId: Int
10 pub var indexStart: Int
11 pub var indexEnd: Int
12 pub var index: Int
13 pub var currentAddresses: [Address]
14 pub var isFinished: Bool
15
16 pub fun isAtBeginning(): Bool {
17 return self.index == self.indexStart
18 }
19
20 pub fun getCurrentAddresses(): [Address] {
21 return self.currentAddresses
22 }
23
24 pub fun getBounds(): [Int] {
25 return [self.index, self.indexEnd]
26 }
27
28 pub fun setCurrentAddresses(addresses: [Address]) {
29 self.currentAddresses = addresses
30 }
31
32 pub fun setIndex(index: Int) {
33 self.index = index
34 }
35
36 pub fun complete(): Int {
37 let processed = self.currentAddresses.length
38 emit RunCompleted(processed: processed)
39 self.index = self.index + processed
40 if (self.index > self.indexEnd) {
41 self.isFinished = true
42 }
43 self.currentAddresses = []
44 return processed
45 }
46
47 pub fun getRemainings(): Int {
48 return self.indexEnd - self.index + 1
49 }
50
51 init(runId: Int, indexStart: Int, indexEnd: Int) {
52 self.runId = runId
53 self.indexStart = indexStart
54 self.indexEnd = indexEnd
55 self.index = indexStart
56 self.currentAddresses = []
57 self.isFinished = false
58 }
59
60 }
61
62 pub struct CurrentRunStatus {
63 pub var totalAccounts: Int
64 pub var startTime: UFix64
65 pub var lastUpdated: UFix64
66 pub var accountsProcessed: Int
67 pub var accountsRemaining: Int
68 pub var hasFinished: Bool
69
70 init(totalAccounts: Int, startTime: UFix64, lastUpdated: UFix64, accountsProcessed: Int, accountsRemaining: Int, hasFinished: Bool) {
71 self.totalAccounts = totalAccounts
72 self.startTime = startTime
73 self.lastUpdated = lastUpdated
74 self.accountsProcessed = accountsProcessed
75 self.accountsRemaining = accountsRemaining
76 self.hasFinished = hasFinished
77 }
78 }
79
80 pub resource QueueBatch {
81 access(contract) var currentRuns: @[Run?]
82 pub var startTime: UFix64
83 pub var startTime0000: UFix64
84 pub var lastUpdateTime: UFix64
85 pub var endTime: UFix64
86 pub var executions: Int
87 pub var runSize: Int
88 pub var accountsToProcess: Int
89 pub var accountsProcessed: Int
90 pub var isStarted: Bool
91
92 access(contract) fun start(accounts: Int) {
93 pre {
94 !self.isStarted : "Queues is already started."
95 }
96
97 if (accounts == 0) {
98 self.startTime = 0.0
99 self.startTime0000 = 0.0
100 self.accountsToProcess = 0
101 self.accountsProcessed = 0
102 return
103 }
104
105 self.accountsToProcess = accounts
106
107 var i = 0
108 var incrFl = UFix64(accounts) / UFix64(self.currentRuns.length)
109 if (incrFl - UFix64(Int(incrFl)) > 0.0) {
110 incrFl = incrFl + 1.0
111 }
112 var incr = Int(incrFl)
113 if (incr < 1) {
114 incr = 1
115 }
116 var indexStart = 0
117 var indexEnd = 0
118
119 while (i < self.currentRuns.length && indexStart < accounts) {
120 indexEnd = indexStart + incr
121 let rem = accounts - indexEnd
122 if (rem < incr) {
123 indexEnd = accounts - 1
124 }
125 let run <- self.currentRuns[i] <- create Run(runId: i, indexStart: indexStart, indexEnd: indexEnd)
126 destroy run
127 i = i + 1
128 indexStart = indexEnd + 1
129 }
130 self.isStarted = true
131 }
132
133 pub fun setRunSize(quantity: Int) {
134 pre {
135 quantity > 0 : "Run size must be greater than 0"
136 quantity < 2 : "Run size must be lower than 2"
137 }
138 var i = 0
139 var temp: @[Run?] <- []
140 while ( i < quantity) {
141 temp.append(nil)
142 i = i + 1
143 }
144 self.currentRuns <-> temp
145
146 // Elements on temp will be nil as the queue is not running
147 destroy temp
148 }
149
150 pub fun isAtBeginning(): Bool {
151 var isBeginning = true
152 var i = 0
153 while (i < self.currentRuns.length && self.currentRuns[i] != nil && self.currentRuns[i]?.isAtBeginning()!) {
154 i = i + 1
155 }
156 return i > self.currentRuns.length
157 }
158
159 pub fun hasFinished(): Bool {
160 return self.accountsToProcess > 0 && self.accountsProcessed == self.accountsToProcess
161 }
162
163 pub fun getFreeRun(): @Run? {
164 var tries = 0
165 var i = 0
166 var run: @Run? <- nil
167
168 while (i < self.currentRuns.length && run == nil) {
169 if (self.currentRuns[i] != nil && !self.currentRuns[i]?.isFinished!) {
170 run <-! create Run(runId: i, indexStart: self.currentRuns[i]?.indexStart!, indexEnd: self.currentRuns[i]?.indexEnd!)
171 run?.setIndex(index: self.currentRuns[i]?.index!)
172 } else {
173 i = i + 1
174 }
175 }
176 if (run != nil) {
177 emit MoxyQueueSelected(queueNumber: i)
178 }
179 return <- run
180 }
181
182 pub fun completeNextAddresses(run: @Run) {
183 self.currentRuns[run.runId]?.setCurrentAddresses(addresses: run.getCurrentAddresses())
184 self.accountsProcessed = self.accountsProcessed + self.currentRuns[run.runId]?.complete()!
185 self.lastUpdateTime = getCurrentBlock().timestamp
186 destroy run
187 }
188
189 pub fun getRemainings(): Int {
190 var total = 0
191 var i = 0
192 while ( i < self.currentRuns.length) {
193 if (self.currentRuns[i] != nil) {
194 total = total + self.currentRuns[i]?.getRemainings()!
195 }
196 i = i + 1
197 }
198 return total
199 }
200
201 pub fun getRunBounds(): [[Int]] {
202 var bounds: [[Int]] = []
203 var i = 0
204 while ( i < self.currentRuns.length) {
205 if (self.currentRuns[i] != nil) {
206 bounds.append(self.currentRuns[i]?.getBounds()!)
207 }
208 i = i + 1
209 }
210 return bounds
211 }
212
213 pub fun getCurrentRunStatus(): CurrentRunStatus {
214 return CurrentRunStatus(totalAccounts: self.accountsToProcess, startTime: self.startTime,
215 lastUpdated: self.lastUpdateTime, accountsProcessed: self.accountsProcessed,
216 accountsRemaining: self.getRemainings(), hasFinished: self.hasFinished())
217 }
218
219 init(runSize: Int, accounts: Int) {
220 self.startTime = getCurrentBlock().timestamp
221 self.startTime0000 = MoxyData.getTimestampTo0000(timestamp: getCurrentBlock().timestamp)
222 self.lastUpdateTime = getCurrentBlock().timestamp
223 self.endTime = 0.0
224 self.executions = 0
225 self.currentRuns <- [ nil ]
226 self.runSize = 1
227 self.isStarted = false
228 self.accountsToProcess = 0
229 self.accountsProcessed = 0
230 self.setRunSize(quantity: runSize)
231 self.start(accounts: accounts)
232 }
233
234 destroy() {
235 destroy self.currentRuns
236 }
237
238 }
239
240 pub resource Queue: QueueInfo {
241 access(contract) var accounts: [Address]
242 access(contract) var accountsDict: {Address:Int}
243 pub var accountsQuantity: Int
244 access(contract) var batchs: @[QueueBatch]
245 access(contract) var currentBatch: @QueueBatch
246 pub var runSize: Int
247 pub var isStarted: Bool
248
249 pub fun addAccount(address: Address) {
250 if (self.accountsDict[address] != nil) {
251 log("Account already added to queue")
252 return
253 }
254 self.accounts.append(address)
255 self.accountsDict[address] = self.accounts.length - 1
256 self.accountsQuantity = self.accountsQuantity + 1
257 }
258
259 pub fun removeAccount(address: Address) {
260 self.accounts[self.accountsDict[address]!] = 0x0
261 self.accountsQuantity = self.accountsQuantity - 1
262 }
263
264 access(contract) fun createNewBatch() {
265 if (self.accounts.length < 1) {
266 log("No accoutns to process.")
267 return
268 }
269 let b <- self.currentBatch <- create QueueBatch(runSize: self.runSize, accounts: self.accounts.length)
270 self.batchs.append(<-b)
271
272 // Keep only the last 30 runs
273 if (self.batchs.length > 30) {
274 let r <- self.batchs.removeFirst()
275 destroy r
276 }
277 }
278
279
280 pub fun getRunSize() :Int {
281 return self.runSize
282 }
283
284 pub fun setRunSize(quantity: Int) {
285 pre {
286 quantity > 0 : "Run size must be greater than 0"
287 quantity < 2 : "Run size must be lower than 2"
288 }
289 self.runSize = quantity
290 }
291
292 /* Returns true if the current batch is at the beginning */
293 pub fun isAtBeginning(): Bool {
294 return self.currentBatch.isAtBeginning()
295 }
296
297 /* Returns true if the current batch has finished */
298 pub fun hasFinished(): Bool {
299 return self.currentBatch.hasFinished()
300 }
301
302 pub fun isEmptyQueue(): Bool {
303 return self.accountsQuantity < 1
304 }
305
306 pub fun getAccountsQuantity(): Int {
307 return self.accountsQuantity
308 }
309
310 pub fun getRemainingAddresses(): [Address] {
311 var addrs: [Address] = []
312 var bounds = self.currentBatch.getRunBounds()
313 var i = 0
314 while ( i < bounds.length) {
315 let fr = bounds[i][0]
316 var to = bounds[i][1] + 1
317 addrs = addrs.concat(self.accounts.slice(from: fr, upTo: to) )
318 i = i + 1
319 }
320 return addrs
321 }
322
323 pub fun lockRunWith(quantity: Int): @Run? {
324 let time0000 = MoxyData.getTimestampTo0000(timestamp: getCurrentBlock().timestamp)
325 if ((self.currentBatch.hasFinished() && self.currentBatch.startTime0000 < time0000) || self.currentBatch.accountsToProcess == 0 ) {
326 self.createNewBatch()
327 }
328
329 let run <- self.currentBatch.getFreeRun()
330
331 if (run != nil) {
332 let fr = run?.index!
333 var to = fr + quantity
334 if (to > run?.indexEnd!) {
335 to = run?.indexEnd! + 1
336 }
337 let addrs = self.accounts.slice(from: fr, upTo: to)
338 run?.setCurrentAddresses(addresses: addrs)
339 return <- run
340 }
341 destroy run
342 return nil
343 }
344
345 pub fun completeNextAddresses(run: @Run) {
346 self.currentBatch.completeNextAddresses(run: <- run)
347 }
348
349 pub fun getRemainings(): Int {
350 if (self.currentBatch.hasFinished() || self.currentBatch.accountsToProcess < 1) {
351 if (self.currentBatch.startTime0000 < (MoxyData.getTimestampTo0000(timestamp: getCurrentBlock().timestamp))) {
352 return self.accounts.length
353 }
354 return 0
355 }
356 return self.currentBatch.getRemainings()
357 }
358
359 pub fun getCurrentRunStatus(): CurrentRunStatus {
360 return self.currentBatch.getCurrentRunStatus()
361 }
362
363 init() {
364 self.accounts = []
365 self.accountsDict = {}
366 self.accountsQuantity = 0
367 self.batchs <- []
368 self.runSize = 1
369 self.currentBatch <- create QueueBatch(runSize: 1, accounts: 0)
370 self.isStarted = false
371 }
372
373 destroy() {
374 destroy self.batchs
375 destroy self.currentBatch
376 }
377 }
378
379 pub fun createNewQueue(): @Queue {
380 return <- create Queue()
381 }
382
383 pub resource interface QueueInfo {
384 pub fun getCurrentRunStatus(): CurrentRunStatus
385 }
386}
387
388