Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upmap remapped add testing #45

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions tools/upmap/test_upmap-remapped.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Use importlib since we can't import a module that contains a hyphen
import importlib
upmap_remapped = importlib.import_module('upmap-remapped')

def test_gen_upmap(monkeypatch):
OSDS = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
DF = [
{"id": 0, "crush_weight": 14, "reweight": 1},
{"id": 1, "crush_weight": 8, "reweight": 1},
{"id": 2, "crush_weight": 12, "reweight": 1},
{"id": 3, "crush_weight": 8, "reweight": 1},
{"id": 4, "crush_weight": 12, "reweight": 1},
{"id": 5, "crush_weight": 10, "reweight": 1},
{"id": 6, "crush_weight": 12, "reweight": 1},
{"id": 7, "crush_weight": 8, "reweight": 1},
{"id": 8, "crush_weight": 12, "reweight": 1},
{"id": 9, "crush_weight": 8, "reweight": 1},
{"id": 10, "crush_weight": 10, "reweight": 1},
{"id": 11, "crush_weight": 10, "reweight": 1},
{"id": 12, "crush_weight": 10, "reweight": 1},
{"id": 13, "crush_weight": 10, "reweight": 1},
{"id": 14, "crush_weight": 10, "reweight": 1},
{"id": 15, "crush_weight": 10, "reweight": 1}
]

# Use monkeypatch to set the required globals
monkeypatch.setattr(upmap_remapped, "OSDS", OSDS)
monkeypatch.setattr(upmap_remapped, "DF", DF)

# EC 4+2 - single
assert upmap_remapped.gen_upmap([2,11,5,9,15,12], [2,11,5,9,14,12]) == [(15,14)]

# EC 4+2 - every osd remapped with no dependencies
assert upmap_remapped.gen_upmap([4,14,10,3,7,8], [9,13,2,15,5,11]) == [(4,9),(14,13),(10,2),(3,15),(7,5),(8,11)]

# EC 4+2 - one dependency
assert upmap_remapped.gen_upmap([6,12,10,0,2,9], [6,12,9,0,2,4]) == [(9,4),(10,9)]

# EC 4+2 - two dependencies
assert upmap_remapped.gen_upmap([3,4,12,6,11,0], [6,14,12,4,11,0]) == [(4,14),(6,4),(3,6)]

# EC 4+2 - three dependencies plus one without a dependency
assert upmap_remapped.gen_upmap([7,1,12,2,15,9], [7,0,4,12,9,2]) == [(1,0),(12,4),(2,12),(9,2),(15,9)]

# EC 4+2 - upmap not possible since osds are swapped
assert upmap_remapped.gen_upmap([9,4,7,10,14,2], [9,7,4,10,14,2]) == []
268 changes: 138 additions & 130 deletions tools/upmap/upmap-remapped.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,42 +40,147 @@

import json, subprocess, sys

try:
import rados
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
cluster.connect()
except:
use_shell = True
else:
use_shell = False
OSDS = []
DF = []

def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def main():
global OSDS
global DF

try:
if use_shell:
OSDS = json.loads(subprocess.getoutput('ceph osd ls -f json | jq -r .'))
DF = json.loads(subprocess.getoutput('ceph osd df -f json | jq -r .nodes'))
try:
import rados
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
cluster.connect()
except:
use_shell = True
else:
cmd = {"prefix": "osd ls", "format": "json"}
ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5)
output = output.decode('utf-8').strip()
OSDS = json.loads(output)
cmd = {"prefix": "osd df", "format": "json"}
ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5)
output = output.decode('utf-8').strip()
DF = json.loads(output)['nodes']
except ValueError:
eprint('Error loading OSD IDs')
sys.exit(1)

ignore_backfilling = False
for arg in sys.argv[1:]:
if arg == "--ignore-backfilling":
eprint ("All actively backfilling PGs will be ignored.")
ignore_backfilling = True
use_shell = False

try:
if use_shell:
OSDS = json.loads(subprocess.getoutput('ceph osd ls -f json | jq -r .'))
DF = json.loads(subprocess.getoutput('ceph osd df -f json | jq -r .nodes'))
else:
cmd = {"prefix": "osd ls", "format": "json"}
ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5)
output = output.decode('utf-8').strip()
OSDS = json.loads(output)
cmd = {"prefix": "osd df", "format": "json"}
ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5)
output = output.decode('utf-8').strip()
DF = json.loads(output)['nodes']
except ValueError:
eprint('Error loading OSD IDs')
sys.exit(1)

ignore_backfilling = False
for arg in sys.argv[1:]:
if arg == "--ignore-backfilling":
eprint ("All actively backfilling PGs will be ignored.")
ignore_backfilling = True

# discover remapped pgs
try:
if use_shell:
remapped_json = subprocess.getoutput('ceph pg ls remapped -f json | jq -r .')
else:
cmd = {"prefix": "pg ls", "states": ["remapped"], "format": "json"}
ret, output, err = cluster.mon_command(json.dumps(cmd), b'', timeout=5)
remapped_json = output.decode('utf-8').strip()
try:
remapped = json.loads(remapped_json)['pg_stats']
except KeyError:
eprint("There are no remapped PGs")
sys.exit(0)
except ValueError:
eprint('Error loading remapped pgs')
sys.exit(1)

# discover existing upmaps
try:
if use_shell:
osd_dump_json = subprocess.getoutput('ceph osd dump -f json | jq -r .')
else:
cmd = {"prefix": "osd dump", "format": "json"}
ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5)
osd_dump_json = output.decode('utf-8').strip()
upmaps = json.loads(osd_dump_json)['pg_upmap_items']
except ValueError:
eprint('Error loading existing upmaps')
sys.exit(1)

# discover pools replicated or erasure
pool_type = {}
try:
if use_shell:
osd_pool_ls_detail = subprocess.getoutput('ceph osd pool ls detail')
else:
cmd = {"prefix": "osd pool ls", "detail": "detail", "format": "plain"}
ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5)
osd_pool_ls_detail = output.decode('utf-8').strip()
for line in osd_pool_ls_detail.split('\n'):
if 'pool' in line:
x = line.split(' ')
pool_type[x[1]] = x[3]
except:
eprint('Error parsing pool types')
sys.exit(1)

# discover if each pg is already upmapped
has_upmap = {}
for pg in upmaps:
pgid = str(pg['pgid'])
has_upmap[pgid] = True

# handle each remapped pg
print('while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done')
num = 0
for pg in remapped:
if num == 50:
print('wait; sleep 4; while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done')
num = 0

if ignore_backfilling:
if "backfilling" in pg['state']:
continue

pgid = pg['pgid']

try:
if has_upmap[pgid]:
rm_upmap_pg_items(pgid)
num += 1
continue
except KeyError:
pass

up = pg['up']
acting = pg['acting']
pool = pgid.split('.')[0]
if pool_type[pool] == 'replicated':
try:
pairs = gen_upmap(up, acting, replicated=True)
except:
continue
elif pool_type[pool] == 'erasure':
try:
pairs = gen_upmap(up, acting)
except:
continue
else:
eprint('Unknown pool type for %s' % pool)
sys.exit(1)
upmap_pg_items(pgid, pairs)
num += 1

print('wait; sleep 4; while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done')
cluster.shutdown()

def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)

def crush_weight(id):
global DF
for o in DF:
if o['id'] == id:
return o['crush_weight'] * o['reweight']
Expand Down Expand Up @@ -131,102 +236,5 @@ def upmap_pg_items(pgid, mapping):
def rm_upmap_pg_items(pgid):
print('ceph osd rm-pg-upmap-items %s &' % pgid)


# start here

# discover remapped pgs
try:
if use_shell:
remapped_json = subprocess.getoutput('ceph pg ls remapped -f json | jq -r .')
else:
cmd = {"prefix": "pg ls", "states": ["remapped"], "format": "json"}
ret, output, err = cluster.mon_command(json.dumps(cmd), b'', timeout=5)
remapped_json = output.decode('utf-8').strip()
try:
remapped = json.loads(remapped_json)['pg_stats']
except KeyError:
eprint("There are no remapped PGs")
sys.exit(0)
except ValueError:
eprint('Error loading remapped pgs')
sys.exit(1)

# discover existing upmaps
try:
if use_shell:
osd_dump_json = subprocess.getoutput('ceph osd dump -f json | jq -r .')
else:
cmd = {"prefix": "osd dump", "format": "json"}
ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5)
osd_dump_json = output.decode('utf-8').strip()
upmaps = json.loads(osd_dump_json)['pg_upmap_items']
except ValueError:
eprint('Error loading existing upmaps')
sys.exit(1)

# discover pools replicated or erasure
pool_type = {}
try:
if use_shell:
osd_pool_ls_detail = subprocess.getoutput('ceph osd pool ls detail')
else:
cmd = {"prefix": "osd pool ls", "detail": "detail", "format": "plain"}
ret, output, errs = cluster.mon_command(json.dumps(cmd), b'', timeout=5)
osd_pool_ls_detail = output.decode('utf-8').strip()
for line in osd_pool_ls_detail.split('\n'):
if 'pool' in line:
x = line.split(' ')
pool_type[x[1]] = x[3]
except:
eprint('Error parsing pool types')
sys.exit(1)

# discover if each pg is already upmapped
has_upmap = {}
for pg in upmaps:
pgid = str(pg['pgid'])
has_upmap[pgid] = True

# handle each remapped pg
print('while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done')
num = 0
for pg in remapped:
if num == 50:
print('wait; sleep 4; while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done')
num = 0

if ignore_backfilling:
if "backfilling" in pg['state']:
continue

pgid = pg['pgid']

try:
if has_upmap[pgid]:
rm_upmap_pg_items(pgid)
num += 1
continue
except KeyError:
pass

up = pg['up']
acting = pg['acting']
pool = pgid.split('.')[0]
if pool_type[pool] == 'replicated':
try:
pairs = gen_upmap(up, acting, replicated=True)
except:
continue
elif pool_type[pool] == 'erasure':
try:
pairs = gen_upmap(up, acting)
except:
continue
else:
eprint('Unknown pool type for %s' % pool)
sys.exit(1)
upmap_pg_items(pgid, pairs)
num += 1

print('wait; sleep 4; while ceph status | grep -q "peering\|activating\|laggy"; do sleep 2; done')
cluster.shutdown()
if __name__ == "__main__":
main()