Skip to content

Commit

Permalink
Add configurable limit for the maximum age and number of events in th…
Browse files Browse the repository at this point in the history
…e event store and remove old events before sending
  • Loading branch information
matus-tomlein committed Jan 19, 2024
1 parent b0c610f commit 9967f25
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 35 deletions.
14 changes: 14 additions & 0 deletions Sources/Core/Emitter/Emitter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ class Emitter: EmitterEventProcessing {
return Int(eventStore.count())
}

/// Limit for the maximum number of unsent events to keep in the event store.
var maxEventStoreSize: Int64 = EmitterDefaults.maxEventStoreSize

/// Limit for the maximum duration of how long events should be kept in the event store if they fail to be sent.
var maxEventStoreAge: TimeInterval = EmitterDefaults.maxEventStoreAge

// MARK: - Initialization

init(namespace: String,
Expand Down Expand Up @@ -289,11 +295,19 @@ class Emitter: EmitterEventProcessing {
/// Empties the buffer of events using the respective HTTP request method.
func flush() {
if requestToStartSending() {
self.removeOldEvents()
self.attemptEmit()
}
}

// MARK: - Control methods

private func removeOldEvents() {
eventStore.removeOldEvents(
maxSize: maxEventStoreSize,
maxAge: maxEventStoreAge
)
}

private func attemptEmit() {
InternalQueue.onQueuePrecondition()
Expand Down
16 changes: 16 additions & 0 deletions Sources/Core/Emitter/EmitterControllerImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,22 @@ class EmitterControllerImpl: Controller, EmitterController {
emitter.retryFailedRequests = newValue
}
}

var maxEventStoreSize: Int64 {
get { return emitter.maxEventStoreSize }
set {
dirtyConfig.maxEventStoreSize = newValue
emitter.maxEventStoreSize = newValue
}
}

var maxEventStoreAge: TimeInterval {
get { return emitter.maxEventStoreAge }
set {
dirtyConfig.maxEventStoreAge = newValue
emitter.maxEventStoreAge = newValue
}
}

// MARK: - Methods

Expand Down
10 changes: 10 additions & 0 deletions Sources/Core/InternalQueue/EmitterControllerIQWrapper.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@ class EmitterControllerIQWrapper: EmitterController {
get { return InternalQueue.sync { controller.retryFailedRequests } }
set { InternalQueue.sync { controller.retryFailedRequests = newValue } }
}

var maxEventStoreSize: Int64 {
get { return InternalQueue.sync { controller.maxEventStoreSize } }
set { InternalQueue.sync { controller.maxEventStoreSize = newValue } }
}

var maxEventStoreAge: TimeInterval {
get { return InternalQueue.sync { controller.maxEventStoreAge } }
set { InternalQueue.sync { controller.maxEventStoreAge = newValue } }
}

// MARK: - Methods

Expand Down
14 changes: 14 additions & 0 deletions Sources/Core/Storage/Database.swift
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,20 @@ class Database {
return rows
}

func removeOldEvents(maxSize: Int64, maxAge: TimeInterval) {
let sql = """
DELETE FROM 'events'
WHERE id NOT IN (
SELECT id FROM events
WHERE dateCreated >= datetime('now','-\(maxAge) seconds')
ORDER BY dateCreated DESC, id DESC
LIMIT \(maxSize)
)
"""

_ = execute(sql: sql, name: "Delete old events")
}

private func prepare(sql: String, name: String, closure: (OpaquePointer?, OpaquePointer?) -> ()) {
withConnection { db in
var statement: OpaquePointer?
Expand Down
63 changes: 28 additions & 35 deletions Sources/Core/Storage/MemoryEventStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ import Foundation

class MemoryEventStore: NSObject, EventStore {

var sendLimit: UInt
var index: Int64
var orderedSet: NSMutableOrderedSet
private var sendLimit: UInt
private var index: Int64
private var eventBuffer: [EmitterEvent] = []

convenience override init() {
self.init(limit: 250)
}

init(limit: UInt) {
orderedSet = NSMutableOrderedSet()
sendLimit = limit
index = 0
}
Expand All @@ -35,44 +34,28 @@ class MemoryEventStore: NSObject, EventStore {
InternalQueue.onQueuePrecondition()

let item = EmitterEvent(payload: payload, storeId: index)
orderedSet.add(item)
eventBuffer.append(item)
index += 1
}

func count() -> UInt {
InternalQueue.onQueuePrecondition()

return UInt(orderedSet.count)
return UInt(eventBuffer.count)
}

func emittableEvents(withQueryLimit queryLimit: UInt) -> [EmitterEvent] {
InternalQueue.onQueuePrecondition()

let setCount = (orderedSet).count
if setCount <= 0 {
return []
}
let len = min(Int(queryLimit), setCount)
_ = NSRange(location: 0, length: len)
var count = 0
let indexes = orderedSet.indexes { _, _, _ in
count += 1
return count <= queryLimit
}
let objects = orderedSet.objects(at: indexes)
var result: [EmitterEvent] = []
for object in objects {
if let event = object as? EmitterEvent {
result.append(event)
}
}
return result
let limit = min(queryLimit, sendLimit)

return Array(eventBuffer.prefix(Int(limit)))
}

func removeAllEvents() -> Bool {
InternalQueue.onQueuePrecondition()

orderedSet.removeAllObjects()
eventBuffer.removeAll()
return true
}

Expand All @@ -85,16 +68,26 @@ class MemoryEventStore: NSObject, EventStore {
func removeEvents(withIds storeIds: [Int64]) -> Bool {
InternalQueue.onQueuePrecondition()

var itemsToRemove: [EmitterEvent] = []
for item in orderedSet {
guard let item = item as? EmitterEvent else {
continue
}
if storeIds.contains(item.storeId) {
itemsToRemove.append(item)
eventBuffer = eventBuffer.filter { !storeIds.contains($0.storeId) }
return true
}

func removeOldEvents(maxSize: Int64, maxAge: TimeInterval) {
InternalQueue.onQueuePrecondition()

let currentTimestamp = Date().timeIntervalSince1970

// remove old events by age
eventBuffer = eventBuffer.filter { emitterEvent in
if let timestampString = emitterEvent.payload[kSPTimestamp] as? String,
let timestamp = Double(timestampString) {
let timestampSecs = timestamp / 1000.0
return currentTimestamp - timestampSecs <= maxAge
}
return true
}
orderedSet.removeObjects(in: itemsToRemove)
return true

// remove old events by size limit
eventBuffer = Array(eventBuffer.suffix(Int(maxSize)))
}
}
6 changes: 6 additions & 0 deletions Sources/Core/Storage/SQLiteEventStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ class SQLiteEventStore: NSObject, EventStore {
return EmitterEvent(payload: payload, storeId: row.id)
}
}

func removeOldEvents(maxSize: Int64, maxAge: TimeInterval) {
InternalQueue.onQueuePrecondition()

return database.removeOldEvents(maxSize: maxSize, maxAge: maxAge)
}
}

#endif
2 changes: 2 additions & 0 deletions Sources/Core/Tracker/ServiceProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ class ServiceProvider: NSObject, ServiceProviderProtocol {
emitter.callback = self.emitterConfiguration.requestCallback
emitter.customRetryForStatusCodes = self.emitterConfiguration.customRetryForStatusCodes
emitter.retryFailedRequests = self.emitterConfiguration.retryFailedRequests
emitter.maxEventStoreSize = self.emitterConfiguration.maxEventStoreSize
emitter.maxEventStoreAge = self.emitterConfiguration.maxEventStoreAge
}

let emitter: Emitter
Expand Down
42 changes: 42 additions & 0 deletions Sources/Snowplow/Configurations/EmitterConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ public protocol EmitterConfigurationProtocol: AnyObject {
/// If disabled, events that failed to be sent will be dropped regardless of other configuration (such as the customRetryForStatusCodes).
@objc
var retryFailedRequests: Bool { get set }
/// Limit for the maximum number of unsent events to keep in the event store.
/// Defaults to 1000.
@objc
var maxEventStoreSize: Int64 { get set }
/// Limit for the maximum duration of how long events should be kept in the event store if they fail to be sent.
/// Defaults to 30 days.
@objc
var maxEventStoreAge: TimeInterval { get set }
}

/// It allows the tracker configuration from the emission perspective.
Expand Down Expand Up @@ -136,6 +144,24 @@ public class EmitterConfiguration: SerializableConfiguration, EmitterConfigurati
set { _retryFailedRequests = newValue }
}

private var _maxEventStoreSize: Int64?
/// Limit for the maximum number of unsent events to keep in the event store.
/// Defaults to 1000.
@objc
public var maxEventStoreSize: Int64 {
get { return _maxEventStoreSize ?? sourceConfig?.maxEventStoreSize ?? EmitterDefaults.maxEventStoreSize }
set { _maxEventStoreSize = newValue }
}

private var _maxEventStoreAge: TimeInterval?
/// Limit for the maximum duration of how long events should be kept in the event store if they fail to be sent.
/// Defaults to 30 days.
@objc
public var maxEventStoreAge: TimeInterval {
get { return _maxEventStoreAge ?? sourceConfig?.maxEventStoreAge ?? EmitterDefaults.maxEventStoreAge }
set { _maxEventStoreAge = newValue }
}

// MARK: - Internal

/// Fallback configuration to read from in case requested values are not present in this configuraiton.
Expand Down Expand Up @@ -258,6 +284,22 @@ public class EmitterConfiguration: SerializableConfiguration, EmitterConfigurati
return self
}

/// Limit for the maximum number of unsent events to keep in the event store.
/// Defaults to 1000.
@objc
public func maxEventStoreSize(_ maxEventStoreSize: Int64) -> Self {
self.maxEventStoreSize = maxEventStoreSize
return self
}

/// Limit for the maximum duration of how long events should be kept in the event store if they fail to be sent.
/// Defaults to 30 days.
@objc
public func maxEventStoreAge(_ maxEventStoreAge: TimeInterval) -> Self {
self.maxEventStoreAge = maxEventStoreAge
return self
}

// MARK: - NSCopying

@objc
Expand Down
2 changes: 2 additions & 0 deletions Sources/Snowplow/Emitter/EmitterDefaults.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ public class EmitterDefaults {
public private(set) static var serverAnonymisation = false
public private(set) static var bufferOption: BufferOption = .single
public private(set) static var retryFailedRequests = true
public private(set) static var maxEventStoreSize: Int64 = 1000 // events
public private(set) static var maxEventStoreAge: TimeInterval = TimeInterval(60 * 60 * 24 * 30) // 30 days
}
6 changes: 6 additions & 0 deletions Sources/Snowplow/Emitter/EventStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,10 @@ public protocol EventStore: NSObjectProtocol {
/// - Returns: EmitterEvent objects containing storeIds and event payloads.
@objc
func emittableEvents(withQueryLimit queryLimit: UInt) -> [EmitterEvent]
/// Remove events older than `maxAge` seconds and keep only the latest `maxSize` events.
/// - Parameters:
/// - maxSize: Limit for the maximum number of unsent events to keep
/// - maxAge: Limit for the maximum duration of how long events should be kept
@objc
func removeOldEvents(maxSize: Int64, maxAge: TimeInterval)
}
32 changes: 32 additions & 0 deletions Tests/Storage/TestDatabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,38 @@ class TestDatabase: XCTestCase {
XCTAssertEqual(db.countRows(), 0)
}

func testRemoveOldEventsByAge() {
let db = createDatabase("db")

db.insertRow(["test": 1])
Thread.sleep(forTimeInterval: 1)
db.insertRow(["test": 2])
Thread.sleep(forTimeInterval: 1)
db.removeOldEvents(maxSize: 5, maxAge: 1)

let rows = db.readRows(numRows: 5)
XCTAssertEqual(rows.count, 1)
XCTAssertEqual(rows.first?.data["test"] as? Int, 2)
}

func testRemoveOldestEventsByMaxSize() {
let db = createDatabase("db")

db.insertRow(["test": 1])
db.insertRow(["test": 2])
db.insertRow(["test": 3])
db.insertRow(["test": 4])
db.insertRow(["test": 5])
db.removeOldEvents(maxSize: 3, maxAge: 5)

let rows = db.readRows(numRows: 5)
XCTAssertEqual(rows.count, 3)
XCTAssertEqual(
rows.map { $0.data["test"] as! Int }.min(),
3
)
}

private func createDatabase(_ namespace: String) -> Database {
DatabaseHelpers.clearPreviousDatabase(namespace)
return Database(namespace: namespace)
Expand Down
Loading

0 comments on commit 9967f25

Please sign in to comment.