|  | 
 
 
 楼主|
发表于 2025-3-14 18:02:58
|
显示全部楼层 
| 先把我的放上来吧 1. 首先是本地存储数据的,定期30秒运行一次。因为jms更新时间差不多就是半分钟。
 
 复制代码import requests
import json
from datetime import datetime
API_URL = "https://justmysocks6.net/members/getbwcounter.php?service="
HISTORY_FILE = "扶Q_usage_history.json"
def fetch_bw_counter():
  response = requests.get(API_URL)
  data = response.json()
  # data = {"monthly_bw_limit_b": 1000000000000,
  #   "bw_counter_b": 651529129,
  #   "bw_reset_day_of_month": 13}
  return data["bw_counter_b"]
def load_history():
  try:
  with open(HISTORY_FILE, "r") as f:
    return json.load(f)
  except (IOError, ValueError):
  return []
def save_history(history):
  with open(HISTORY_FILE, "w") as f:
  json.dump(history, f, indent=2)
def record_usage():
  # 1) Fetch the current usage
  current_bw = fetch_bw_counter()
  timestamp = datetime.utcnow().isoformat()
  
  # 2) Append new record to history
  history = load_history()
  history.append({
  "timestamp": timestamp,
  "bw_counter_b": current_bw
  })
  save_history(history)
  
if __name__ == "__main__":
  record_usage()
 2. 第二是显示面板,第一个面板是总体使用量,第二个是在不同时间的速率,
 
 复制代码import json
import pandas as pd
from datetime import timedelta
import dash
from dash import dcc, html, Input, Output
import plotly.graph_objs as go
# ----- Utility Functions -----
def load_usage_data(file_path="扶Q_usage_history.json"):
  """Load usage data from JSON, localize to America/Los_Angeles, then convert to UTC.
   Plotly will automatically render these timestamps in the visitor’s local time.
  """
  with open(file_path, "r") as f:
  data = json.load(f)
  df = pd.DataFrame(data)
  # Assume timestamps in the file are in California time.
  df['timestamp'] = pd.to_datetime(df['timestamp']).dt.tz_localize('America/Los_Angeles')
  # Convert to UTC for consistent plotting.
  df['timestamp'] = df['timestamp'].dt.tz_convert('UTC')
  df.sort_values('timestamp', inplace=True)
  return df
def convert_bytes(value_bytes):
  """
  Convert a byte value to a human-friendly string using a 1000 conversion factor.
  If the value in GB is less than 0.001, display in MB.
  If in MB is less than 0.001, display in B.
  """
  value_gb = value_bytes / 1e9
  if value_gb >= 0.001:
  return f"{value_gb:.3f} GB"
  value_mb = value_bytes / 1e6
  if value_mb >= 0.001:
  return f"{value_mb:.3f} MB"
  return f"{value_bytes} B"
def aggregate_data(df, resolution, window):
  """
  Aggregate usage data for a given resolution and time window.
  resolution: a pandas offset alias, e.g., 'T' for minute, 'H' for hour, 'D' for day, 'W' for week.
  window: timedelta object representing the lookback period.
  """
  end_time = df['timestamp'].max()
  start_time = end_time - window
  df_window = df[df['timestamp'] >= start_time].copy()
  if df_window.empty:
  return pd.DataFrame(columns=['timestamp', 'bw_counter_b'])
  df_window.set_index('timestamp', inplace=True)
  df_resampled = df_window.resample(resolution).last().dropna()
  df_resampled.reset_index(inplace=True)
  return df_resampled
def compute_usage_rates(df):
  """
  Compute the incremental usage (difference between consecutive bw_counter_b) 
  and time differences. Returns the DataFrame with a new column 'usage_diff'.
  """
  df = df.copy()
  df['usage_diff'] = df['bw_counter_b'].diff()
  df['time_diff_sec'] = df['timestamp'].diff().dt.total_seconds()
  df['usage_rate'] = df['usage_diff'] / df['time_diff_sec']
  return df
# ----- Dash App Setup -----
app = dash.Dash(__name__)
server = app.server
app.layout = html.Div([
  html.H1("扶Q Data Usage Dashboard"),
  html.Div([
  html.Button("Minutes", id="btn-minutes", n_clicks=0),
  html.Button("Hourly", id="btn-hourly", n_clicks=0),
  html.Button("Daily", id="btn-daily", n_clicks=0),
  html.Button("Weekly", id="btn-weekly", n_clicks=0)
  ], style={'marginBottom': '20px'}),
  html.Div(id="summary-stats", style={'marginBottom': '20px'}),
  dcc.Graph(id="usage-graph"),
  dcc.Graph(id="rate-graph"),
  dcc.Interval(id="interval-update", interval=60*1000, n_intervals=0)# update every minute
])
# ----- Callback to Update Graphs and Stats -----
@app.callback(
  [Output("usage-graph", "figure"),
 Output("rate-graph", "figure"),
 Output("summary-stats", "children")],
  [Input("btn-minutes", "n_clicks"),
 Input("btn-hourly", "n_clicks"),
 Input("btn-daily", "n_clicks"),
 Input("btn-weekly", "n_clicks"),
 Input("interval-update", "n_intervals")]
)
def update_dashboard(n_min, n_hour, n_day, n_week, n_interval):
  df = load_usage_data()
  
  # Determine which button was most recently pressed
  ctx = dash.callback_context
  if not ctx.triggered:
  resolution_choice = 'H'
  window = timedelta(hours=24)
  else:
  button_id = ctx.triggered[0]['prop_id'].split('.')[0]
  if button_id == "btn-minutes":
    resolution_choice = 'T'# minute resolution
    window = timedelta(hours=1)
  elif button_id == "btn-hourly":
    resolution_choice = 'H'
    window = timedelta(hours=24)
  elif button_id == "btn-daily":
    resolution_choice = 'D'
    window = timedelta(days=7)
  elif button_id == "btn-weekly":
    resolution_choice = 'W'
    window = timedelta(weeks=4)
  else:
    resolution_choice = 'H'
    window = timedelta(hours=24)
  
  df_agg = aggregate_data(df, resolution_choice, window)
  df_rate = compute_usage_rates(df_agg)
  
  # ----- Cumulative Usage Figure -----
  cum_fig = go.Figure()
  cum_fig.add_trace(go.Scatter(
  x=df_agg['timestamp'],
  y=df_agg['bw_counter_b'] / 1e9,# cumulative usage in GB
  mode='lines+markers',
  name="Cumulative Usage (GB)",
  connectgaps=False
  ))
  cum_fig.update_layout(
  title="扶Q Cumulative Usage Over Time",
  xaxis_title="Time",
  yaxis_title="Usage (GB)",
  hovermode="x unified"
  )
  
  # ----- Usage Rate Figure -----
  df_rate_clean = df_rate.dropna(subset=['usage_diff'])
  if not df_rate_clean.empty:
  max_diff = df_rate_clean['usage_diff'].max()
  if max_diff / 1e9 >= 0.001:
    factor = 1e9
    y_label = "Usage per Interval (GB)"
  elif max_diff / 1e6 >= 0.001:
    factor = 1e6
    y_label = "Usage per Interval (MB)"
  else:
    factor = 1
    y_label = "Usage per Interval (B)"
  usage_diff_converted = df_rate_clean['usage_diff'] / factor
  else:
  usage_diff_converted = []
  y_label = "Usage per Interval"
  
  rate_fig = go.Figure()
  rate_fig.add_trace(go.Scatter(
  x=df_rate_clean['timestamp'],
  y=usage_diff_converted,
  mode='lines+markers',
  name="Interval Usage",
  connectgaps=False
  ))
  rate_fig.update_layout(
  title="扶Q Usage Rate Over Time",
  xaxis_title="Time",
  yaxis_title=y_label,
  hovermode="x unified"
  )
  
  # ----- Summary Statistics -----
  if not df_rate['usage_rate'].dropna().empty:
  avg_rate = df_rate['usage_rate'].dropna().mean()# bytes per second
  avg_per_min = convert_bytes(avg_rate * 60)
  avg_per_hour = convert_bytes(avg_rate * 3600)
  avg_per_day = convert_bytes(avg_rate * 3600 * 24)
  avg_per_week = convert_bytes(avg_rate * 3600 * 24 * 7)
  else:
  avg_per_min = avg_per_hour = avg_per_day = avg_per_week = "N/A"
  
  summary = html.Div([
  html.P(f"Average Usage per Minute: {avg_per_min}"),
  html.P(f"Average Usage per Hour: {avg_per_hour}"),
  html.P(f"Average Usage per Day: {avg_per_day}"),
  html.P(f"Average Usage per Week: {avg_per_week}")
  ])
  
  return cum_fig, rate_fig, summary
if __name__ == '__main__':
  app.run_server(debug=True)
 | 
 |