立即创建
取消
2.前端JS
3.Django代码
urls.py
文件内容
from django.conf.urls import patterns
from home_application.temp import views as temp_view
from home_application.job import views as job_view
urlpatterns = patterns(
'home_application.views',
(r'^$', job_view.job),
(r'^get_biz_list/$', job_view.get_biz_list),
(r'^get_bk_hosts/$', job_view.get_bk_hosts),
(r'^temp_view/$', temp_view.TemplateView.as_view()),
(r'^job_view/$', job_view.JobView.as_view()),
...
)
job\views.py
文件内容
import json
from django.views.generic import View
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.http import JsonResponse
from common.mymako import render_mako_context
from home_application.models import Job
def cc_search_host():
"""
根据条件查询主机
:return:
"""
url = "{0}/api/c/compapi/v2/cc/search_host/".format(BK_PAAS_HOST)
kwargs = {
"bk_app_code": APP_ID,
"bk_app_secret": APP_TOKEN,
"bk_username": "admin",
"condition": [
{
"bk_obj_id": "host",
"fields": ["bk_host_name", "bk_host_innerip", "bk_host_id", "bk_cloud_id", ],
},
{
"bk_obj_id": "biz",
"fields": ["bk_biz_id", "bk_biz_name"],
}
],
# "page": {"start": 0, "limit": 10}
}
response = requests.post(url=url, data=json.dumps(kwargs), verify=False)
if response.status_code == 200:
return json.loads(response.content).get("data")
# 获取所有业务,测试OK
def cc_search_business(fields=[]):
"""
获取业务列表
:param request:
:return:
"""
client = get_client_by_user("admin")
params = {'fields': fields}
res = client.cc.search_business(**params)
if res.get('code') == 0:
return res.get('data', {}).get('info', [])
def job(request):
return render_mako_context(request, '/home_application/job_center.html')
def get_biz_list(request):
fields = ["bk_biz_id", "bk_biz_name"]
return JsonResponse({"result": True, "data": data})
def get_bk_hosts(request):
from home_application.utils.cc_by_request import cc_search_host
res_data = cc_search_host()
return JsonResponse({"result": True, "data": res_data})
class CsrfExemptView(View):
@method_decorator(csrf_exempt)
def dispatch(self, request, *args, **kwargs):
return super(CsrfExemptView, self).dispatch(request, *args, **kwargs)
class JobView(CsrfExemptView):
def get(self, request, *args, **kwargs):
search_type = request.GET.get("search_type")
query_str = request.GET.get("query_str")
try:
job_query = Job.objects.all()
except Exception:
return JsonResponse({"result": False})
if search_type:
job_query = job_query.filter(type=search_type)
if query_str:
job_query = job_query.filter(job_name__icontains=query_str)
res_data = [i.to_dict() for i in job_query]
return JsonResponse({"result": True, "data": res_data})
def post(self, request, *args, **kwargs):
data = json.loads(request.body)
job_obj = {
"bk_biz_id": data.get("add_biz").split(":")[0],
"bk_biz_name": data.get("add_biz").split(":")[1],
"temp_id": data.get("add_temp").split(":")[0],
"temp_name": data.get("add_temp").split(":")[1],
"job_name": data.get("add_job_name"),
"type": data.get("add_type"),
"interval": data.get("add_interval"),
"host_ip": data.get("add_host_ip"),
}
try:
Job.objects.create(**job_obj)
return JsonResponse({"result": True})
except Exception as e:
print(e)
return JsonResponse({"result": False})
def put(self, request, *args, **kwargs):
data = json.loads(request.body)
pk = data.get("pk")
bk_biz_id = data.get("edit_bk_biz").split(":")[0]
temp_id = data.get("edit_temp_name").split(":")[0]
temp_name = data.get("edit_temp_name").split(":")[1]
bk_biz_name = data.get("edit_bk_biz").split(":")[1]
job_name = data.get("edit_job_name")
type = data.get("edit_type")
interval = data.get("edit_interval")
host_ip = data.get("edit_host_ip")
job_obj = {
"bk_biz_id": bk_biz_id,
"bk_biz_name": bk_biz_name,
"temp_id": temp_id,
"temp_name": temp_name,
"job_name": job_name,
"type": type,
"interval": interval,
"host_ip": host_ip,
}
try:
Job.objects.filter(pk=pk).update(**job_obj)
return JsonResponse({"result": True})
except Exception as e:
return JsonResponse({"result": False})
def delete(self, request, *args, **kwargs):
data = json.loads(request.body)
pk = data.get("id")
try:
Job.objects.filter(pk=pk).delete()
return JsonResponse({"result": True})
except Exception:
return JsonResponse({"result": False})
models.py
文件内容
class Job(models.Model):
bk_biz_id = models.CharField(u"业务ID", max_length=8, blank=True, null=True)
bk_biz_name = models.CharField(u"业务名称", max_length=32, blank=True, null=True)
temp_id = models.CharField(u"模板ID", max_length=8, blank=True, null=True)
temp_name = models.CharField(u"模板名称", max_length=32, blank=True, null=True)
job_name = models.CharField(u"作业名称", max_length=16, blank=True, null=True)
type = models.CharField(u"类型", max_length=8, blank=True, null=True)
interval = models.CharField(u"间隔时间", max_length=4, blank=True, null=True)
host_ip = models.CharField(u"主机IP", max_length=32, blank=True, null=True)
create_time = models.DateTimeField(u"创建时间", auto_now_add=True)
def to_dict(self):
return {
"pk": self.pk,
"bk_biz_id": self.bk_biz_id,
"bk_biz_name": self.bk_biz_name,
"temp_id": self.temp_id,
"temp_name": self.temp_name,
"job_name": self.job_name,
"type": self.type,
"interval": self.interval,
"host_ip": self.host_ip,
"create_time": parse_datetime_to_timestr(self.create_time),
}
实现效果


感谢各位的阅读,以上就是“怎么用python + Element实现作业任务Job操作”的内容了,经过本文的学习后,相信大家对怎么用python + Element实现作业任务Job操作这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!
网页标题:怎么用python+Element实现作业任务Job操作
网页URL:http://wtcwzsj.com/article/gjosdj.html
- 关于我们
-
- 我们的服务
-
- 网站建设案例
-
- 新闻动态
-
- 联系我们
-

135-1821-9792
公司服务热线
Copyright © 2009-2022 www.wtcwzsj.com 青羊区广皓图文设计工作室(个体工商户) 版权所有 蜀ICP备19037934号