-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
84 lines (73 loc) · 2.35 KB
/
app.py
File metadata and controls
84 lines (73 loc) · 2.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import dash
import dash_bootstrap_components as dbc
import dash_core_components as dcc
import dash_html_components as html
from celery.result import AsyncResult
from dash.dash import no_update
from dash.dependencies import Input, Output
import celery_tasks.state_tasks as tasks
external_stylesheets = [
dbc.themes.BOOTSTRAP,
"https://codepen.io/chriddyp/pen/bWLwgP.css",
]
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
[
html.Button(id="task_button", children="Click me nao!"),
dcc.Interval(id="progress_bar_interval", interval=1000, disabled=True),
dcc.Store(id="celery_id", data=None),
html.Div(id="my-div"),
html.Div(
html.Div(
[dbc.Progress(id="progress")],
id="progress_container2",
style={"display": "block"},
),
id="progress_container1",
style={"display": "none"},
),
]
)
@app.callback(
[Output("progress_container1", "style"), Output("celery_id", "data")],
[Input("task_button", "n_clicks")],
)
def start_task(n_clicks):
if n_clicks:
t = tasks.task.s().delay()
celery_id = t.id
visibility = {"display": "block"}
else:
visibility = {"display": "none"}
celery_id = None
return visibility, celery_id
@app.callback(
[
Output("progress", "value"),
Output("progress_container2", "style"),
Output("progress_bar_interval", "disabled"),
Output("my-div", "children"),
],
[Input("progress_bar_interval", "n_intervals"), Input("celery_id", "data")],
)
def update_progress_bar(progress_bar_interval, celery_id):
print(celery_id)
if celery_id is not None:
t = AsyncResult(celery_id, app=tasks.app)
if celery_id and not t.ready() and t.info is not None:
print(f"State={t.state}, info={t.info}")
progress = int(t.info["done"] / t.info["total"] * 100)
visibility = {"display": "block"}
stop_refresh = False
result = no_update
else:
progress = 100
visibility = {"display": "none"}
stop_refresh = True
if celery_id:
result = t.get()
else:
result = no_update
return progress, visibility, stop_refresh, result
if __name__ == "__main__":
app.run_server(debug=True)