OpenStack Swift源碼導讀:業務整體架構和Proxy進程
OpenStack的源碼分析在網上已經非常多了,針對各個部分的解讀亦是非常詳盡。這里我根據自己的理解把之前讀過的Swift源碼的一些要點記錄一下,希望給需要的同學能帶來一些幫助。
一、Swift的整體框架圖
如上圖,Swift的源碼目錄結構。其中proxy是前端的業務接入進程。account、container和object目錄分別是賬戶、容器 和對象的業務處理邏輯進程。common目錄是一些通用工具代碼。common中比較重要的有:哈希環的處理邏輯。接下來會依次介紹各個進程的源碼邏輯和 一些關鍵點機制。
各個業務進程或模塊之間的邏輯關系可以參考《Openstack Swift簡介》文中的架構圖。
二、Proxy進程的業務處理
首先需要掌握基于PasteDeploy的堆棧式WSGI架構。根據PasteDeploy定義的各個層,可以很快理清配置文件定義的代碼流程,從 middleware到server。找到最外層的middleware,即是業務的入口。對于proxy進程,可以簡單給出業務時序圖:
每一層的分工非常清晰,如在proxy進程默認配置文件中,最上層是做異常處理,所有的業務流程拋出的未處理的異常,在這里都將得到處理。
Proxy進程會分析請求的URI(account、container和object組成的資源路徑)和請求方法(put、del等)來分析當前 請求的資源的具體類型,然后分貝找到控制該資源的controller,由controller來分發請求到具體的資源server。分發到原則是一致性 哈希環。一致性哈希環在系統初始化時由工具生成,在《Swift 和 Keystone單機安裝總結》一文中有具體的操作步驟。
在《Openstack Swift簡介》從理論上面介紹了具體的節點尋找過程。采用md5值加移位的方式來確定part,然后找到所有的虛擬節點。具體的代碼為:
- container_partition, containers = self.app.container_ring.get_nodes(
- self.account_name, self.container_name)
- def get_nodes(self, account, container=None, obj=None):
- """
- Get the partition and nodes
- for an account/container/object.
- If a node is responsible
- for more than one replica, it will
- only appear in the
- output once.
- :param account: account name
- :param
- container: container name
- :param obj: object name
- :returns: a tuple of (partition, list of node dicts)
- Each node dict will have at least the following keys:
- ======
- ===============================================================
- id unique integer
- identifier amongst devices
- weight a float of the
- relative weight of this device as compared to
- others;
- this indicates how many partitions the builder will try
- to assign
- to this device
- zone integer indicating
- which zone the device is in; a given
- partition
- will not be assigned to multiple devices within the
- same zone
- ip the ip address of the
- device
- port the tcp port of the device
- device the device's name on disk (sdb1, for
- example)
- meta general use 'extra'
- field; for example: the online date, the
- hardware
- description
- ======
- ===============================================================
- """
- part = self.get_part(account,
- container, obj)
- return part,
- self._get_part_nodes(part)
- def get_part(self, account, container=None, obj=None):
- """
- Get the partition for an
- account/container/object.
- :param account: account name
- :param
- container: container name
- :param obj: object name
- :returns: the partition number
- """
- key = hash_path(account, container, obj, raw_digest=True)
- if time() >; self._rtime:
- self._reload()
- part = struct.unpack_from('>;I', key)[0] >>
- self._part_shift
- return part
- def _get_part_nodes(self, part):
- part_nodes = []
- seen_ids = set()
- for r2p2d in
- self._replica2part2dev_id:
- if
- part <; len(r2p2d):
- dev_id =
- r2p2d[part]
- if dev_id
- not in seen_ids:
- part_nodes.append(self.devs[dev_id])
- seen_ids.add(dev_id)
- return part_nodes
然后根據quorum原則來決定當前請求至少需要幾個節點成功即可返回。如NWR分別為322,則至少需要2個節點寫成功,才能確保此次寫成功。體現在公用的make_request方法中:
- def make_requests(self, req, ring, part, method, path, headers,
- query_string=''):
- """
- Sends an
- HTTP request to multiple nodes and aggregates the results.
- It attempts the primary nodes concurrently, then iterates
- over the
- handoff nodes as needed.
- :param req: a request sent by the client
- :param ring: the ring used for finding backend servers
- :param part: the partition number
- :param method: the method to send to the backend
- :param
- path: the path to send to the backend
- (full path ends up being /<$device>/<$part>/<$path>)
- :param headers: a list of dicts, where each dict
- represents one
- backend request that should be made.
- :param query_string:
- optional query string to send to the backend
- :returns: a
- swob.Response object
- """
- start_nodes = ring.get_part_nodes(part)
- nodes =
- GreenthreadSafeIterator(self.app.iter_nodes(ring, part))
- pile = GreenAsyncPile(len(start_nodes))
- for head in
- headers:
- pile.spawn(self._make_request, nodes, part, method, path,
- head, query_string, self.app.logger.thread_locals)
- response = []
- statuses = []
- for
- resp in pile:
- if not resp:
- continue
- response.append(resp)
- statuses.append(resp[0])
- if self.have_quorum(statuses,
- len(start_nodes)):
- break
- # give any pending requests *some* chance to finish
- pile.waitall(self.app.post_quorum_timeout)
- while len(response) <; len(start_nodes):
- response.append((HTTP_SERVICE_UNAVAILABLE, '', '', ''))
- statuses, reasons, resp_headers, bodies = zip(*response)
- return self.best_response(req, statuses, reasons, bodies,
- '%s %s' % (self.server_type, req.method),
- headers=resp_headers)