Source code for quarry.types.nbt

import collections
import functools
import gzip
import time
import zlib

from quarry.types.buffer import Buffer
from quarry.types.chunk import PackedArray

_kinds = {}
_ids = {}


# Base types ------------------------------------------------------------------

@functools.total_ordering
class _Tag(object):
    def __init__(self, value):
        self.value = value

    @classmethod
    def from_bytes(cls, bytes):
        return cls.from_buff(Buffer(bytes))

    @classmethod
    def from_buff(cls, buff):
        raise NotImplementedError

    def to_bytes(self):
        raise NotImplementedError

    def to_obj(self):
        return self.value

    def __repr__(self):
        return "%s(%r)" % (type(self).__name__, self.value)

    def __eq__(self, other):
        return self.to_obj() == other.to_obj()

    def __lt__(self, other):
        return self.to_obj() < other.to_obj()


class _DataTag(_Tag):
    fmt = None

    @classmethod
    def from_buff(cls, buff):
        return cls(buff.unpack(cls.fmt))

    def to_bytes(self):
        return Buffer.pack(self.fmt, self.value)


class _ArrayTag(_Tag):
    width = None

    @classmethod
    def from_buff(cls, buff):
        return cls(PackedArray.from_bytes(
            bytes=buff.read(buff.unpack('i') * (cls.width // 8)),
            sector_width=cls.width))

    def to_bytes(self):
        data = self.value.to_bytes()
        data = Buffer.pack('i', len(data) // (self.width // 8)) + data
        return data

    def to_obj(self):
        return list(self.value)


# NBT tags --------------------------------------------------------------------

class TagByte(_DataTag):
    fmt = 'b'


class TagShort(_DataTag):
    fmt = 'h'


class TagInt(_DataTag):
    fmt = 'i'


class TagLong(_DataTag):
    fmt = 'q'


class TagFloat(_DataTag):
    fmt = 'f'


class TagDouble(_DataTag):
    fmt = 'd'


class TagString(_Tag):

    @classmethod
    def from_buff(cls, buff):
        string_length = buff.unpack('H')
        return cls(buff.read(string_length).decode('utf8'))

    def to_bytes(self):
        data = self.value.encode('utf8')
        return Buffer.pack('H', len(data)) + data


class TagByteArray(_ArrayTag):
    width = 8


class TagIntArray(_ArrayTag):
    width = 32


class TagLongArray(_ArrayTag):
    width = 64


class TagList(_Tag):
    @classmethod
    def from_buff(cls, buff):
        inner_kind_id, array_length = buff.unpack('bi')
        inner_kind = _kinds[inner_kind_id]
        return cls([inner_kind.from_buff(buff) for _ in range(array_length)])

    def to_bytes(self):
        if len(self.value) > 0:
            head = self.value[0]
        else:
            head = TagByte(0)

        return Buffer.pack('bi', _ids[type(head)], len(self.value)) + \
               b"".join(tag.to_bytes() for tag in self.value)

    def to_obj(self):
        return [tag.to_obj() for tag in self.value]


class TagCompound(_Tag):
    root = False
    preserve_order = False

    @classmethod
    def from_buff(cls, buff):
        if cls.preserve_order:
            value = collections.OrderedDict()
        else:
            value = {}

        while True:
            kind_id = buff.unpack('b')
            if kind_id == 0:
                return cls(value)
            kind = _kinds[kind_id]
            name = TagString.from_buff(buff).value
            tag = kind.from_buff(buff)
            value[name] = tag
            if cls.root:
                return cls(value)

    def to_bytes(self):
        string = b""
        for name, tag in self.value.items():
            string += Buffer.pack('b', _ids[type(tag)])
            string += TagString(name).to_bytes()
            string += tag.to_bytes()

        if len(self.value) == 0 or not self.root:
            string += Buffer.pack('b', 0)

        return string

    def to_obj(self):
        return dict((name, tag.to_obj()) for name, tag in self.value.items())

    def update(self, other_tag):
        for name, new_tag in other_tag.value.items():
            old_tag = self.value.get(name)

            if old_tag and not new_tag:
                del self.value[name]
            elif isinstance(old_tag, TagCompound) \
                    and isinstance(new_tag, TagCompound):
                self.value[name].update(new_tag)
            else:
                self.value[name] = new_tag


class TagRoot(TagCompound):
    root = True

    @classmethod
    def from_body(cls, body):
        return cls({u"": body})

    @property
    def body(self):
        return self.value[u""]


# Register tags ---------------------------------------------------------------

_kinds[0] = type(None)
_kinds[1] = TagByte
_kinds[2] = TagShort
_kinds[3] = TagInt
_kinds[4] = TagLong
_kinds[5] = TagFloat
_kinds[6] = TagDouble
_kinds[7] = TagByteArray
_kinds[8] = TagString
_kinds[9] = TagList
_kinds[10] = TagCompound
_kinds[11] = TagIntArray
_kinds[12] = TagLongArray
_ids.update({v: k for k, v in _kinds.items()})


# Files -----------------------------------------------------------------------

[docs]class NBTFile(object): root_tag = None def __init__(self, root_tag): self.root_tag = root_tag
[docs] @classmethod def load(cls, path): with gzip.open(path, 'rb') as fd: return cls(TagRoot.from_bytes(fd.read()))
[docs] def save(self, path): with gzip.open(path, 'wb') as fd: fd.write(self.root_tag.to_bytes())
[docs]class RegionFile(object): """ Experimental support for the Minecraft world storage format (``.mca``). """ def __init__(self, path): self.fd = open(path, "r+b") def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.fd.close()
[docs] def close(self): """ Closes the region file. """ self.fd.close()
[docs] def save_chunk(self, chunk): """ Saves the given chunk, which should be a ``TagRoot``, to the region file. """ # Compress chunk chunk_x = chunk.body.value["Level"].value["xPos"].value chunk_z = chunk.body.value["Level"].value["zPos"].value chunk = zlib.compress(chunk.to_bytes()) chunk = Buffer.pack('IB', len(chunk), 2) + chunk chunk_length = 1 + (len(chunk) - 1) // 4096 # Load extents extents = [(0, 2)] self.fd.seek(0) buff = Buffer(self.fd.read(4096)) for idx in range(1024): z, x = divmod(idx, 32) entry = buff.unpack('I') offset, length = entry >> 8, entry & 0xFF if offset > 0 and not (x == chunk_x and z == chunk_z): extents.append((offset, length)) extents.sort() extents.append((extents[-1][0] + extents[-1][1] + chunk_length, 0)) # Compute new extent for idx in range(len(extents) - 1): start = extents[idx][0] + extents[idx][1] end = extents[idx+1][0] if (end - start) >= chunk_length: chunk_offset = start extents.insert(idx+1, (chunk_offset, chunk_length)) break # Write extent header self.fd.seek(4 * (32 * chunk_z + chunk_x)) self.fd.write(Buffer.pack( 'I', (chunk_offset << 8) | (chunk_length & 0xFF))) # Write timestamp header self.fd.seek(4096 + 4 * (32 * chunk_z + chunk_x)) self.fd.write(Buffer.pack('I', int(time.time()))) # Write chunk self.fd.seek(4096 * chunk_offset) self.fd.write(chunk) # Truncate file self.fd.seek(4096 * extents[-1][0]) self.fd.truncate()
[docs] def load_chunk(self, chunk_x, chunk_z): """ Loads the chunk at the given co-ordinates from the region file. The co-ordinates should range from 0 to 31. Returns a ``TagRoot``. """ buff = Buffer() # Read extent header self.fd.seek(4 * (32 * chunk_z + chunk_x)) buff.add(self.fd.read(4)) entry = buff.unpack('I') chunk_offset, chunk_length = entry >> 8, entry & 0xFF if chunk_offset == 0: raise ValueError((chunk_x, chunk_z)) # Read chunk self.fd.seek(4096 * chunk_offset) buff.add(self.fd.read(4096 * chunk_length)) chunk = buff.read(buff.unpack('IB')[0]) chunk = zlib.decompress(chunk) chunk = TagRoot.from_bytes(chunk) return chunk
[docs] def load_chunk_section(self, chunk_x, chunk_y, chunk_z): """ Loads the chunk section at the given co-ordinates from the region file. The co-ordinates should range from 0 to 31. Returns a ``TagRoot``. """ chunk = self.load_chunk(chunk_x, chunk_z) sections = chunk.body.value["Level"].value["Sections"].value for section in sections: if section.value["Y"].value == chunk_y: return chunk, section raise ValueError((chunk_x, chunk_y, chunk_z))
# Debug -----------------------------------------------------------------------
[docs]def alt_repr(tag, level=0): """ Returns a human-readable representation of a tag using the same format as used the NBT specification. """ name = lambda kind: type(kind).__name__.replace("Tag", "TAG_") if isinstance(tag, _ArrayTag): return "%s%s: %d entries" % ( " " * level, name(tag), len(tag.value)) elif isinstance(tag, TagList): return "%s%s: %d entries\n%s{\n%s\n%s}" % ( " " * level, name(tag), len(tag.value), " " * level, u"\n".join(alt_repr(tag, level+1) for tag in tag.value), " " * level) elif isinstance(tag, TagRoot): return u"\n".join( alt_repr(tag, level).replace(': ', '("%s"): ' % name, 1) for name, tag in tag.value.items()) elif isinstance(tag, TagCompound): return "%s%s: %d entries\n%s{\n%s\n%s}" % ( " " * level, name(tag), len(tag.value), " " * level, u"\n".join( alt_repr(tag, level+1).replace(': ', '("%s"): ' % name, 1) for name, tag in tag.value.items()), " " * level) elif isinstance(tag, TagString): return '%s%s: "%s"' % ( " " * level, name(tag), tag.value) else: return "%s%s: %r" % ( " " * level, name(tag), tag.value)