k8s的服务网格(service mesh)头牌istio中,有一个称为bookinfo的样例,涉及了几个微服务组成一个简单书店。编程语言涉及了python,java,ruby,node.js。本来它是用来演示istio的路由流量这些功能,是以docker镜像部署在k8s中的。
改造成虚拟机,有什么用途么?嘿嘿,当然有了,在我构思的下个课题里,就需要有这样一些示例,这里就先隐藏一些细节了。
我这次是将这4个微服务,部署在一个机器上,分别启用不同的端口,并实现原来bookinfo的所有界面功能。
总体部署图
2021-02-01 22_02_34-Istio _ Bookinfo Application.png本项目的github地址:
https://github.com/aguncn/bookinfo-productpage
主要应用的访问网址:
bookinfo-productpage:http://localhost:8001
python
bookinfo-details:http://localhost:8002
ruby
bookinfo-reviews:http://localhost:8003
java
bookinfo-ratings:http://localhost:8004
node.js
productpage.py主文件内容
from __future__ import print_function
from flask_bootstrap import Bootstrap
from flask import Flask, request, session, render_template, redirect, url_for
from flask import _request_ctx_stack as stack
from jaeger_client import Tracer, ConstSampler
from jaeger_client.reporter import NullReporter
from jaeger_client.codecs import B3Codec
from opentracing.ext import tags
from opentracing.propagation import Format
from opentracing_instrumentation.request_context import get_current_span, span_in_context
import simplejson as json
import requests
import sys
from json2html import *
import logging
import requests
import os
import asyncio
# These two lines enable debugging at httplib level (requests->urllib3->http.client)
# You will see the REQUEST, including HEADERS and DATA, and RESPONSE with HEADERS but without DATA.
# The only thing missing will be the response.body which is not logged.
try:
import http.client as http_client
except ImportError:
# Python 2
import httplib as http_client
http_client.HTTPConnection.debuglevel = 1
app = Flask(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
app.logger.addHandler(logging.StreamHandler(sys.stdout))
app.logger.setLevel(logging.DEBUG)
# Set the secret key to some random bytes. Keep this really secret!
app.secret_key = b'_5#y2L"F4Q8z\n\xec]/'
Bootstrap(app)
servicesDomain = "localhost"
flood_factor = 0 if (os.environ.get("FLOOD_FACTOR") is None) else int(os.environ.get("FLOOD_FACTOR"))
details = {
"name": "http://{}:8002".format(servicesDomain),
"endpoint": "details",
"children": []
}
ratings = {
"name": "http://{}:8004".format(servicesDomain),
"endpoint": "ratings",
"children": []
}
reviews = {
"name": "http://{}:8003".format(servicesDomain),
"endpoint": "reviews",
"children": [ratings]
}
productpage = {
"name": "http://{}:8001".format(servicesDomain),
"endpoint": "details",
"children": [details, reviews]
}
service_dict = {
"productpage": productpage,
"details": details,
"reviews": reviews,
}
print(service_dict)
# A note on distributed tracing:
#
# Although Istio proxies are able to automatically send spans, they need some
# hints to tie together the entire trace. Applications need to propagate the
# appropriate HTTP headers so that when the proxies send span information, the
# spans can be correlated correctly into a single trace.
#
# To do this, an application needs to collect and propagate the following
# headers from the incoming request to any outgoing requests:
#
# x-request-id
# x-b3-traceid
# x-b3-spanid
# x-b3-parentspanid
# x-b3-sampled
# x-b3-flags
#
# This example code uses OpenTracing (http://opentracing.io/) to propagate
# the 'b3' (zipkin) headers. Using OpenTracing for this is not a requirement.
# Using OpenTracing allows you to add application-specific tracing later on,
# but you can just manually forward the headers if you prefer.
#
# The OpenTracing example here is very basic. It only forwards headers. It is
# intended as a reference to help people get started, eg how to create spans,
# extract/inject context, etc.
# A very basic OpenTracing tracer (with null reporter)
tracer = Tracer(
one_span_per_rpc=True,
service_name='productpage',
reporter=NullReporter(),
sampler=ConstSampler(decision=True),
extra_codecs={Format.HTTP_HEADERS: B3Codec()}
)
def trace():
'''
Function decorator that creates opentracing span from incoming b3 headers
'''
def decorator(f):
def wrapper(*args, **kwargs):
request = stack.top.request
try:
# Create a new span context, reading in values (traceid,
# spanid, etc) from the incoming x-b3-*** headers.
span_ctx = tracer.extract(
Format.HTTP_HEADERS,
dict(request.headers)
)
# Note: this tag means that the span will *not* be
# a child span. It will use the incoming traceid and
# spanid. We do this to propagate the headers verbatim.
rpc_tag = {tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER}
span = tracer.start_span(
operation_name='op', child_of=span_ctx, tags=rpc_tag
)
except Exception as e:
# We failed to create a context, possibly due to no
# incoming x-b3-*** headers. Start a fresh span.
# Note: This is a fallback only, and will create fresh headers,
# not propagate headers.
span = tracer.start_span('op')
with span_in_context(span):
r = f(*args, **kwargs)
return r
wrapper.__name__ = f.__name__
return wrapper
return decorator
def getForwardHeaders(request):
headers = {}
# x-b3-*** headers can be populated using the opentracing span
span = get_current_span()
carrier = {}
tracer.inject(
span_context=span.context,
format=Format.HTTP_HEADERS,
carrier=carrier)
headers.update(carrier)
# We handle other (non x-b3-***) headers manually
if 'user' in session:
headers['end-user'] = session['user']
incoming_headers = ['x-request-id', 'x-datadog-trace-id', 'x-datadog-parent-id', 'x-datadog-sampled']
# Add user-agent to headers manually
if 'user-agent' in request.headers:
headers['user-agent'] = request.headers.get('user-agent')
for ihdr in incoming_headers:
val = request.headers.get(ihdr)
if val is not None:
headers[ihdr] = val
# print "incoming: "+ihdr+":"+val
return headers
# The UI:
@app.route('/')
@app.route('/index.html')
def index():
""" Display productpage with normal user and test user buttons"""
global productpage
table = json2html.convert(json=json.dumps(productpage),
table_attributes="class=\"table table-condensed table-bordered table-hover\"")
return render_template('index.html', serviceTable=table)
@app.route('/health')
def health():
return 'Product page is healthy'
@app.route('/login', methods=['POST'])
def login():
user = request.values.get('username')
response = app.make_response(redirect(request.referrer))
session['user'] = user
return response
@app.route('/logout', methods=['GET'])
def logout():
response = app.make_response(redirect(request.referrer))
session.pop('user', None)
return response
# a helper function for asyncio.gather, does not return a value
async def getProductReviewsIgnoreResponse(product_id, headers):
getProductReviews(product_id, headers)
# flood reviews with unnecessary requests to demonstrate Istio rate limiting, asynchoronously
async def floodReviewsAsynchronously(product_id, headers):
# the response is disregarded
await asyncio.gather(*(getProductReviewsIgnoreResponse(product_id, headers) for _ in range(flood_factor)))
# flood reviews with unnecessary requests to demonstrate Istio rate limiting
def floodReviews(product_id, headers):
loop = asyncio.new_event_loop()
loop.run_until_complete(floodReviewsAsynchronously(product_id, headers))
loop.close()
@app.route('/productpage')
@trace()
def front():
product_id = 0 # TODO: replace default value
headers = getForwardHeaders(request)
user = session.get('user', '')
product = getProduct(product_id)
detailsStatus, details = getProductDetails(product_id, headers)
if flood_factor > 0:
floodReviews(product_id, headers)
reviewsStatus, reviews = getProductReviews(product_id, headers)
return render_template(
'productpage.html',
detailsStatus=detailsStatus,
reviewsStatus=reviewsStatus,
product=product,
details=details,
reviews=reviews,
user=user)
# The API:
@app.route('/api/v1/products')
def productsRoute():
return json.dumps(getProducts()), 200, {'Content-Type': 'application/json'}
@app.route('/api/v1/products/<product_id>')
@trace()
def productRoute(product_id):
headers = getForwardHeaders(request)
status, details = getProductDetails(product_id, headers)
return json.dumps(details), status, {'Content-Type': 'application/json'}
@app.route('/api/v1/products/<product_id>/reviews')
@trace()
def reviewsRoute(product_id):
headers = getForwardHeaders(request)
status, reviews = getProductReviews(product_id, headers)
return json.dumps(reviews), status, {'Content-Type': 'application/json'}
@app.route('/api/v1/products/<product_id>/ratings')
@trace()
def ratingsRoute(product_id):
headers = getForwardHeaders(request)
status, ratings = getProductRatings(product_id, headers)
return json.dumps(ratings), status, {'Content-Type': 'application/json'}
# Data providers:
def getProducts():
return [
{
'id': 0,
'title': 'The Comedy of Errors',
'descriptionHtml': '<a href="https://en.wikipedia.org/wiki/The_Comedy_of_Errors">Wikipedia Summary</a>: The Comedy of Errors is one of <b>William Shakespeare\'s</b> early plays. It is his shortest and one of his most farcical comedies, with a major part of the humour coming from slapstick and mistaken identity, in addition to puns and word play.'
}
]
def getProduct(product_id):
products = getProducts()
if product_id + 1 > len(products):
return None
else:
return products[product_id]
def getProductDetails(product_id, headers):
try:
url = details['name'] + "/" + details['endpoint'] + "/" + str(product_id)
res = requests.get(url, headers=headers, timeout=3.0)
except BaseException:
res = None
if res and res.status_code == 200:
return 200, res.json()
else:
status = res.status_code if res is not None and res.status_code else 500
return status, {'error': 'Sorry, product details are currently unavailable for this book.'}
def getProductReviews(product_id, headers):
# Do not remove. Bug introduced explicitly for illustration in fault injection task
# TODO: Figure out how to achieve the same effect using Envoy retries/timeouts
for _ in range(2):
try:
url = reviews['name'] + "/" + reviews['endpoint'] + "/" + str(product_id)
res = requests.get(url, headers=headers, timeout=3.0)
except BaseException:
res = None
if res and res.status_code == 200:
return 200, res.json()
status = res.status_code if res is not None and res.status_code else 500
return status, {'error': 'Sorry, product reviews are currently unavailable for this book.'}
def getProductRatings(product_id, headers):
try:
url = ratings['name'] + "/" + ratings['endpoint'] + "/" + str(product_id)
res = requests.get(url, headers=headers, timeout=3.0)
except BaseException:
res = None
if res and res.status_code == 200:
return 200, res.json()
else:
status = res.status_code if res is not None and res.status_code else 500
return status, {'error': 'Sorry, product ratings are currently unavailable for this book.'}
class Writer(object):
def __init__(self, filename):
self.file = open(filename, 'w')
def write(self, data):
self.file.write(data)
def flush(self):
self.file.flush()
if __name__ == '__main__':
if len(sys.argv) < 2:
logging.error("usage: %s port" % (sys.argv[0]))
sys.exit(-1)
p = int(sys.argv[1])
logging.info("start at port %s" % (p))
# Python does not work on an IPv6 only host
# https://bugs.python.org/issue10414
app.run(host='0.0.0.0', port=p, debug=True, threaded=True)
只部署了productpage时的界面
启动命令
python3 productpage.py 8001
浏览器访问http://192.168.1.212:8001/productpage?u=normal
可以看到,detail及reviews,都是错误的返回。
网友评论