楼主觉得这个怎么样?
```python
from collections import deque
from typing import Dict, Tuple
class TimeSeriesDB:
def __init__(self, window_size: int):
self.window_size = window_size
# Dictionary mapping tags to their respective deques
# Each deque maintains timestamps and values in chronological order
# Example: {"env:prod": deque([(1, 10), (2, 20), (3, 30)])}
self.buffers: Dict[str, deque] = {}
# Dictionary tracking running sums for O(1) query time
# Example: {"env:prod": 60} (sum of 10 + 20 + 30)
self.running_sums: Dict[str, int] = {}
def add_point(self, tag: str, timestamp: int, value: int) -> None:
"""
Add a new point maintaining chronological order
Time complexity: O(n) due to insertion
"""
# Initialize new tag if needed
if tag not in self.buffers:
self.buffers[tag] = deque()
self.running_sums[tag] = 0
buffer = self.buffers[tag]
# Remove outdated points (outside window)
while buffer and buffer[0][0] <= timestamp - self.window_size:
self.running_sums[tag] -= buffer[0][1]
buffer.popleft()
# Find correct position to maintain chronological order
insert_pos = 0
for i, (ts, _) in enumerate(buffer):
if timestamp < ts:
insert_pos = i
break
insert_pos = i + 1
# Insert at correct chronological position
buffer.insert(insert_pos, (timestamp, value))
self.running_sums[tag] += value
def query(self, tag: str) -> int:
"""
Query sum for window with O(1) complexity
Returns sum of values within window_size of most recent timestamp
"""
return self.running_sums.get(tag, 0)
# Example usage demonstrating out-of-order inserts
db = TimeSeriesDB(window_size=5)
# Insert out of order
db.add_point("env:dev", 3, 30) # Third timestamp first
db.add_point("env:dev", 1, 10) # First timestamp second
db.add_point("env:dev", 2, 20) # Second timestamp last
# Points are now stored in order: [(1, 10), (2, 20), (3, 30)]
print(db.query("env:dev")) # Returns 60 (sum of all points within last 5 time units)
# Add a point that will make the first point expire
db.add_point("env:dev", 7, 40) # Point at timestamp 1 will be removed
# Points are now: [(2, 20), (3, 30), (7, 40)]
print(db.query("env:dev")) # Returns 90
```
Time Complexity:
- Initialization: O(1)
- add_point(): O(n) where n is current points in window
* Finding insert position: O(n) - linear search through buffer
* Insertion in deque: O(n) - shifting elements
* Removing outdated points: O(k) where k is points to remove
* Updating running sum: O(1)
- query(): O(1)
* Simply returns running sum
* No calculation needed as window is maintained in add_point()
Space Complexity: O(n * m) where:
- n is window_size (max points per tag)
- m is number of unique tags
- We store:
* buffers: O(n * m) for deques
* running_sums: O(m) for sums per tag
补充内容 (2024-12-09 23:57 +08:00):
```python
from collections import deque
from typing import Dict
class TimeSeriesDB:
def __init__(self, max_window_size: int):
self.max_window_size = max_window_size
# Dictionary mapping tags to their respective deques
# Each deque maintains timestamps and values in chronological order
# Example: {"env:prod": deque([(1, 10), (2, 20), (3, 30)])}
self.buffers: Dict[str, deque] = {}
def add_point(self, tag: str, timestamp: int, value: int) -> None:
"""
Add a new point while maintaining chronological order and
removing points outside the max_window_size.
"""
if tag not in self.buffers:
self.buffers[tag] = deque()
buffer = self.buffers[tag]
# Remove outdated points based on max_window_size
while buffer and buffer[0][0] <= timestamp - self.max_window_size:
buffer.popleft()
# Insert at correct chronological position
insert_pos = 0
for i, (ts, _) in enumerate(buffer):
if timestamp < ts:
insert_pos = i
break
insert_pos = i + 1
buffer.insert(insert_pos, (timestamp, value))
def query(self, tag: str, window_size: int) -> int:
"""
Query the sum of values for the given tag within the specified window size.
If window_size exceeds max_window_size, it defaults to max_window_size.
"""
if tag not in self.buffers or not self.buffers[tag]:
return 0
# Ensure requested window size does not exceed max_window_size
window_size = min(window_size, self.max_window_size)
buffer = self.buffers[tag]
max_timestamp = buffer[-1][0]
window_start = max_timestamp - window_size
total = 0
# Iterate backwards through the buffer (which is sorted by timestamp)
for (ts, val) in reversed(buffer):
if ts >= window_start:
total += val
else:
# Since buffer is sorted, once we hit a timestamp < window_start, we can stop
break
return total
# Example usage:
db = TimeSeriesDB(max_window_size=10)
# Process each point in the input
for point in input_points:
db.add_point(
tag=point["tag"],
timestamp=point["timestamp"],
value=point["value"]
)
# Query with window_size=5
# max_timestamp = 6, window_start = 6-5 = 1
# Points in [1,6]: (3,200), (6,300) = 500
print(db.query("env:dev", 5)) # Output: 500
# Query with window_size=15, capped at max_window_size=10
# window_start = 6-10=-4
# Points in [-4,6]: (0,100), (3,200), (6,300) = 600
print(db.query("env:dev", 15)) # Output: 600
``` |